This pattern is part of Patterns of Distributed Systems

Emergent Leader

Order cluster nodes based on their age whithin the cluster to allow nodes to select a leader without running an explicit election.

18 August 2022


Peer-to-peer systems treat each cluster node as equal; there is no strict leader. This means there is no explicit leader election process as happens in the Leader and Followers pattern. Sometimes the cluster also doesn't want to depend on a separate Consistent Core to achieve better availability. However, there still needs to be one cluster node acting as cluster coordinator for tasks such as assigning data partitions to other cluster nodes and tracking when new cluster nodes join or fail and take corrective actions.


One of the common techniques used in peer-to-peer systems is to order cluster nodes according to their 'age'. The oldest member of the cluster plays the role of the coordinator for the cluster. The coordinator is responsible for deciding on membership changes as well as making decisions such as where Fixed Partitions should be placed across cluster nodes.

To form the cluster, one of the cluster nodes acts as a seed node or an introducer node. All the cluster nodes join the cluster by contacting the seed node.

Every cluster node is configured with the seed node address. When a cluster node is started, it tries to contact the seed node to join the cluster.

class ClusterNode…

  MembershipService membershipService;
  public void start(Config config) {
      this.membershipService =  new MembershipService(config.getListenAddress());

The seed node could be any of the cluster nodes. It's configured with its own address as the seed node address and is the first node that is started. It immediately begins accepting requests. The age of the seed node is 1.

class MembershipService…

  Membership membership;
  public void join(InetAddressAndPort seedAddress) {
      int maxJoinAttempts = 5;
      for(int i = 0; i < maxJoinAttempts; i++){
          try {
          } catch (Exception e) {
    "Join attempt " + i + "from " + selfAddress + " to " + seedAddress + " failed. Retrying");
      throw new JoinFailedException("Unable to join the cluster after " + maxJoinAttempts + " attempts");

  private void joinAttempt(InetAddressAndPort seedAddress) throws ExecutionException, TimeoutException {
      if (selfAddress.equals(seedAddress)) {
          int membershipVersion = 1;
          int age = 1;
          updateMembership(new Membership(membershipVersion, Arrays.asList(new Member(selfAddress, age, MemberStatus.JOINED))));
      long id = this.messageId++;
      CompletableFuture<JoinResponse> future = new CompletableFuture<>();
      JoinRequest message = new JoinRequest(id, selfAddress);
      pendingRequests.put(id, future);
      network.send(seedAddress, message);

      JoinResponse joinResponse = Uninterruptibles.getUninterruptibly(future, 5, TimeUnit.SECONDS);

  private void start() {
      startSplitBrainChecker(); + " joined the cluster. Membership=" + membership);

  private void updateMembership(Membership membership) {
      this.membership  = membership;

There can be more than one seed node. But seed nodes start accepting requests only after they themselves join the cluster. Also the cluster will be functional if the seed node is down, but no new nodes will be able to add to the cluster.

Non seed nodes then send the join request to the seed node. The seed node handles the join request by creating a new member record and assigning its age. It then updates its own membership list and sends messages to all the existing members with the new membership list. It then waits to make sure that the response is returned from every node, but will eventually return the join response even if the response is delayed.

class MembershipService…

  public void handleJoinRequest(JoinRequest joinRequest) {

  private void handleNewJoin(JoinRequest joinRequest) {
      List<Member> existingMembers = membership.getLiveMembers();
      ResultsCollector resultsCollector = broadcastMembershipUpdate(existingMembers);
      JoinResponse joinResponse = new JoinResponse(joinRequest.messageId, selfAddress, membership);
      resultsCollector.whenComplete((response, exception) -> {
"Sending join response from " + selfAddress + " to " + joinRequest.from);
          network.send(joinRequest.from, joinResponse);

class Membership…

  public Membership addNewMember(InetAddressAndPort address) {
      var newMembership = new ArrayList<>(liveMembers);
      int age = yongestMemberAge() + 1;
      newMembership.add(new Member(address, age, MemberStatus.JOINED));
      return new Membership(version + 1, newMembership, failedMembers);

  private int yongestMemberAge() {
      return -> m.age).max(Integer::compare).orElse(0);

If a node which was already part of the cluster is trying to rejoin after a crash, the failure detector state related to that member is cleared.

class MembershipService…

  private void handlePossibleRejoin(JoinRequest joinRequest) {
      if (membership.isFailed(joinRequest.from)) {
          //member rejoining
  + " rejoining the cluster. Removing it from failed list");

It's then added as a new member. Each member needs to be identified uniquely. It can be assigned a unique identifier at startup. This then provides a point of reference that makes it possible to check if it is an existing cluster node that is rejoining.

The membership class maintains the list of live members as well as failed members. The members are moved from live to failed list if they stop sending HeartBeat as explained in the failure detection section.

class Membership…

  public class Membership {
      List<Member> liveMembers = new ArrayList<>();
      List<Member> failedMembers = new ArrayList<>();
      public boolean isFailed(InetAddressAndPort address) {
          return -> m.address.equals(address));

Sending membership updates to all the existing members

Membership updates are sent to all the other nodes concurrently. The coordinator also needs to track whether all the members successfully received the updates.

A common technique is to send a one way request to all nodes and expect an acknowledgement message. The cluster nodes send acknowledgement messages to the coordinator to confirm receipt of the membership update. A ResultCollector object can track receipt of all the messages asynchronously, and is notified every time an acknowledgement is received for a membership update. It completes its future once the expected acknowledgement messages are received.

class MembershipService…

  private ResultsCollector broadcastMembershipUpdate(List<Member> existingMembers) {
      ResultsCollector resultsCollector = sendMembershipUpdateTo(existingMembers);
      resultsCollector.orTimeout(2, TimeUnit.SECONDS);
      return resultsCollector;

  Map<Long, CompletableFuture> pendingRequests = new HashMap();
  private ResultsCollector sendMembershipUpdateTo(List<Member> existingMembers) {
      var otherMembers = otherMembers(existingMembers);
      ResultsCollector collector = new ResultsCollector(otherMembers.size());
      if (otherMembers.size() == 0) {
          return collector;
      for (Member m : otherMembers) {
          long id = this.messageId++;
          CompletableFuture<Message> future = new CompletableFuture();
          future.whenComplete((result, exception)->{
              if (exception == null){
          pendingRequests.put(id, future);
          network.send(m.address, new UpdateMembershipRequest(id, selfAddress, membership));
      return collector;

class MembershipService…

  private void handleResponse(Message message) {

  private void completePendingRequests(Message message) {
      CompletableFuture requestFuture = pendingRequests.get(message.messageId);
      if (requestFuture != null) {

class ResultsCollector…

  class ResultsCollector {
      int totalAcks;
      int receivedAcks;
      CompletableFuture future = new CompletableFuture();
      public ResultsCollector(int totalAcks) {
          this.totalAcks = totalAcks;
      public void ackReceived() {
          if (receivedAcks == totalAcks) {
      public void orTimeout(int time, TimeUnit unit) {
          future.orTimeout(time, unit);
      public void whenComplete(BiConsumer<? super Object, ? super Throwable> func) {
      public void complete() {

To see how ResultCollector works, consider a cluster with a set of nodes: let's call them athens, byzantium and cyrene. athens is acting as a coordinator. When a new node - delphi - sends a join request to athens, athens updates the membership and sends the updateMembership request to byantium and cyrene. It also creates a ResultCollector object to track acknowledgements. It records each acknowledgement received with ResultCollector. When it receives acknowledgements from both byzantium and cyrene, it then responds to delphi.

Frameworks like Akka use Gossip Dissemination and Gossip Convergence to track whether updates have reached all cluster nodes.

An example scenario

Consider another three nodes. Again, we'll call them athens, byzantium and cyrene. athens acts as a seed node; the other two nodes are configured as such.

When athens starts, it detects that it is itself the seed node. It immediately initializes the membership list and starts accepting requests.

When byzantium starts, it sends a join request to athens. Note that even if byzantium starts before athens, it will keep trying to send join requests until it can connect to athens. Athens finally adds byzantium to the membership list and sends the updated membership list to byzantium. Once byzantium receives the response from athens, it can start accepting requests. With all-to-all heartbeating, byzantium starts sending heartbeats to athens, and athens sends heartbeat to byzantium.

cyrene starts next. It sends join requests to athens. Athens updates the membership list and sends updated membership list to byantium. It then sends the join response with the membership list to cyrene. With all to all heartbeating, cyrene, athens and byzantium all send heartbeats to each other.

Handling missing membership updates

It's possible that some cluster nodes miss membership updates. There are two solutions to handle this problem.

If all members are sending heartbeat to all other members, the membership version number can be sent as part of the heartbeat. The cluster node that handles the heartbeat can then ask for the latest membership. Frameworks like Akka which use Gossip Dissemination track convergence of the gossiped state.

class MembershipService…

  private void handleHeartbeatMessage(HeartbeatMessage message) {
      if (isCoordinator() && message.getMembershipVersion() < this.membership.getVersion()) {
          membership.getMember(message.from).ifPresent(member -> {
    "Membership version in " + selfAddress + "=" + this.membership.version + " and in " + message.from + "=" + message.getMembershipVersion());

    "Sending membership update from " + selfAddress + " to " + message.from);

In the above example, if byzantium misses the membership update from athens, it will be detected when byzantine sends the heartbeat to athens. athens can then send the latest membership to byzantine.

Alternatively each cluster node can check the lastest membership list periodically, - say every one second - with other cluster nodes. If any of the nodes figure out that their member list is outdated, it can then ask for the latest membership list so it can update it. To be able to compare membership lists, generally a version number is maintained and incremented everytime there is a change.

Failure Detection

Each cluster also runs a failure detector to check if heartbeats are missing from any of the cluster nodes. In a simple case, all cluster nodes send heartbeats to all the other nodes. But only the coordinator marks the nodes as failed and communicates the updated membership list to all the other nodes. This makes sure that not all nodes unilaterally deciding if some other nodes have failed. Hazelcast is an example of this implementation.

class MembershipService…

  private boolean isCoordinator() {
      Member coordinator = membership.getCoordinator();
      return coordinator.address.equals(selfAddress);

  TimeoutBasedFailureDetector<InetAddressAndPort> failureDetector
          = new TimeoutBasedFailureDetector<InetAddressAndPort>(Duration.ofSeconds(2));

  private void checkFailedMembers(List<Member> members) {
      if (isCoordinator()) {

      } else {
          //if failed member consists of coordinator, then check if this node is the next coordinator.

  void removeFailedMembers() {
      List<Member> failedMembers = checkAndGetFailedMembers(membership.getLiveMembers());
      if (failedMembers.isEmpty()) {

Avoiding all-to-all heartbeating

All-to-all heartbeating is not feasible in large clusters. Typically each node will receive heartbeats from only a few other nodes. If a failure is detected, it's broadcasted to all the other nodes including the coordinator.

For example in Akka a node ring is formed by sorting network addresses and each cluster node sends heartbeats to only a few cluster nodes. Ignite arranges all the nodes in the cluster in a ring and each node sends heartbeat only to the node next to it. Hazelcast uses all-to-all heartbeat.

Any membership changes, because of nodes being added or node failures need to be broadcast to all the other cluster nodes. A node can connect to every other node to send the required information. Gossip Dissemination can be used to broadcast this information.

Split Brain Situation

Even though a single coordinator node decides when to mark another nodes as down, there's no explicit leader-election happening to select which node acts as a coordinator. Every cluster node expects a heartbeat from the existing coordinator node; if it doesn't get a heartbeat in time, it can then claim to be the coordinator and remove the existing coordinator from the memberlist.

class MembershipService…

  private void claimLeadershipIfNeeded(List<Member> members) {
      List<Member> failedMembers = checkAndGetFailedMembers(members);
      if (!failedMembers.isEmpty() && isOlderThanAll(failedMembers)) {
          var newMembership = membership.failed(failedMembers);

  private boolean isOlderThanAll(List<Member> failedMembers) {
      return -> m.age < thisMember().age);

  private List<Member> checkAndGetFailedMembers(List<Member> members) {
      List<Member> failedMembers = members
              .filter(member -> !member.address.equals(selfAddress) && failureDetector.isMonitoring(member.address) && !failureDetector.isAlive(member.address))
              .map(member -> new Member(member.address, member.age, member.status)).collect(Collectors.toList());

 + " marking " + member.address + " as DOWN");
      return failedMembers;

This can create a situation where there are two or more subgroups formed in an existing cluster, each considering the others to have failed. This is called split-brain problem.

Consider a five node cluster, athens, byzantium, cyrene, delphi and euphesus. If athens receives heartbeats from dephi and euphesus, but stops getting heartbeats from byzantium, cyrene, it marks both byzantium and cyrene as failed.

byzantium and cyrene could send heartbeats to each other, but stop receiving heartbeats from cyrene, dephi and euphesus. byzantium being the second oldest member of the cluster, then becomes the coordinator. So two separate clusters are formed one with athens as the coordinator and the other with byzantium as the coordinator.

Handling split brain

One common way to handle split brain issue is to check whether there are enough members to handle any client request, and reject the request if there are not enough live members. For example, Hazelcast allows you to configure minimum cluster size to execute any client request.

public void handleClientRequest(Request request) {
    if (!hasMinimumRequiredSize()) {
        throw new NotEnoughMembersException("Requires minium 3 members to serve the request");

private boolean hasMinimumRequiredSize() {
    return membership.getLiveMembers().size() > 3;

The part which has the majority of the nodes, continues to operate, but as explained in the Hazelcast documentation, there will always be a time window in which this protection has yet to come into effect.

The problem can be avoided if cluster nodes are not marked as down unless it's guaranteed that they won't cause split brain. For example, Akka recommends that you don’t have nodes marked as down through the failure detector; you can instead use its split brain resolver. component.

Recovering from split brain

The coordinator runs a periodic job to check if it can connect to the failed nodes. If a connection can be established, it sends a special message indicating that it wants to trigger a split brain merge.

If the receiving node is the coordinator of the subcluster, it will check to see if the cluster that is initiating the request is part of the minority group. If it is, it will send a merge request. The coordinator of the minority group, which receives the merge request, will then execute the merge request on all the nodes in the minority sub group.

class MembershipService…

  splitbrainCheckTask = taskScheduler.scheduleWithFixedDelay(() -> {
          1, 1, TimeUnit.SECONDS);

class MembershipService…

  private void searchOtherClusterGroups() {
      if (membership.getFailedMembers().isEmpty()) {
      List<Member> allMembers = new ArrayList<>();
          if (isCoordinator()) {
          for (Member member : membership.getFailedMembers()) {
    "Sending SplitBrainJoinRequest to " + member.address);
              network.send(member.address, new SplitBrainJoinRequest(messageId++, this.selfAddress, membership.version, membership.getLiveMembers().size()));

If the receiving node is the coordinator of the majority subgroup, it asks the sending coordinator node to merge with itself.

class MembershipService…

  private void handleSplitBrainJoinMessage(SplitBrainJoinRequest splitBrainJoinRequest) { + " Handling SplitBrainJoinRequest from " + splitBrainJoinRequest.from);
      if (!membership.isFailed(splitBrainJoinRequest.from)) {

      if (!isCoordinator()) {

      if(splitBrainJoinRequest.getMemberCount() < membership.getLiveMembers().size()) {
          //requesting node should join this cluster.
 + " Requesting " + splitBrainJoinRequest.from + " to rejoin the cluster");
          network.send(splitBrainJoinRequest.from, new SplitBrainMergeMessage(splitBrainJoinRequest.messageId, selfAddress));

      } else {
          //we need to join the other cluster


  private void mergeWithOtherCluster(InetAddressAndPort otherClusterCoordinator) {
      handleMerge(new MergeMessage(messageId++, selfAddress, otherClusterCoordinator)); //initiate merge on this node.

  private void askAllLiveMembersToMergeWith(InetAddressAndPort mergeToAddress) {
      List<Member> liveMembers = membership.getLiveMembers();
      for (Member m : liveMembers) {
          network.send(m.address, new MergeMessage(messageId++, selfAddress, mergeToAddress));

In the example discussed in the above section, when athens can communicate with byzantium, it will ask byzantium to merge with itself.

The coordinator of the smaller subgroup, then asks all the cluster nodes inside its group to trigger a merge. The merge operation shuts down and rejoins the cluster nodes to the coordinator of the larger group.

class MembershipService…

  private void handleMerge(MergeMessage mergeMessage) { + " Merging with " + mergeMessage.getMergeToAddress());
      //join the cluster again through the other cluster's coordinator
      taskScheduler.execute(()-> {

In the example above, byzantium and cyrene shutdown and rejoin athens to form a full cluster again.

Comparison with Leader and Followers

It's useful to compare this pattern with that of Leader and Followers. The leader-follower setup, as used by patterns like Consistent Core, does not function unless the leader is selected by running an election. This guarantees that the Quorum of cluster nodes have an agreement about who the leader is. In the worst case scenario, if an agreement isn't reached, the system will be unavailable to process any requests. In other words, it prefers consistency over availability.

The emergent leader, on the other hand will always have some cluster node acting as a leader for processing client requests. In this case, availability is preferred over consistency.


In JGroups the oldest member is the coordinator and decides membership changes. In Akka the oldest member of the cluster runs actor singletons like shard coordinator which decide the placement of Fixed Partitions across cluster nodes. In-memory data grids like Hazelcast and Ignite have the oldest member as the cluster coordinator.

Significant Revisions

18 August 2022: