High-Water Mark

An index in the write ahead log showing the last successful replication.

05 August 2020

aka: CommitIndex

Problem

The Write-Ahead Log pattern is used to recover state after the server crashes and restarts. But a write-ahead log is not enough to provide availability in case of server failure. If a single server fails, then clients won't be able to function until the server restarts. To get a more available system, we can replicate the log on multiple servers. Using Leader and Followers the leader replicates all its log entries to a Quorum of followers. Now should the leader fail, a new leader can be elected, and clients can mostly continue to work with the cluster as before. But there are still a couple things that can go wrong:

  • The leader can fail before sending its log entries to any followers.
  • The leader can fail after sending log entries to some followers, but could not send it to the majority of followers.

In these error scenarios, some followers can be missing entries in their logs, and some followers can have more entries than others. So it becomes important for each follower to know what part of the log is safe to be made available to the clients.

Solution

The high-water mark is an index into the log file that records the last log entry that is known to have successfully replicated to a Quorum of followers. The leader also passes on the high-water mark to its followers during its replication. All servers in the cluster should only transmit data to clients that reflects updates that are below the high-water mark.

Here's the sequence of operations.

Figure 1: High-Water Mark

For each log entry, the leader appends it to its local write ahead log, and then sends it to all the followers.

leader (class ReplicationModule...)

  private Long appendAndReplicate(byte[] data) {
      Long lastLogEntryIndex = appendToLocalLog(data);
      logger.info("Replicating log entries till index " + lastLogEntryIndex + " on followers");
      replicateOnFollowers(lastLogEntryIndex);
      return lastLogEntryIndex;
  }


  private void replicateOnFollowers(Long entryAtIndex) {
      for (final FollowerHandler follower : followers) {
          replicateOn(follower, entryAtIndex); //send replication requests to followers
      }
  }

The followers handle the replication request and append the log entries to their local logs. After successfully appending the log entries, they respond to the leader with the index of the latest log entry they have. The response also includes the current Generation Clock of the server.

follower (class ReplicationModule...)

  private ReplicationResponse handleReplicationRequest(ReplicationRequest replicationRequest) {
      List<WALEntry> entries = replicationRequest.getEntries();
      for (WALEntry entry : entries) {
          logger.info("Appending log entry " + entry.getEntryId() + " in " + serverId());
          if (wal.exists(entry)) {
              logger.info("Entry " + wal.readAt(entry.getEntryId()) + " already exists on " + config.getServerId());
              continue;
          }
          wal.writeEntry(entry);
      }
      return new ReplicationResponse(SUCCEEDED, serverId(), replicationState.getGeneration(), wal.getLastLogEntryId());
  }

The Leader keeps track of log indexes replicated at each server, when responses are received.

class ReplicationModule…

  recordReplicationConfirmedFor(response.getServerId(), response.getReplicatedLogIndex());
  long logIndexAtQuorum = computeHighwaterMark(logIndexesAtAllServers(), config.numberOfServers());
  logger.info("logIndexAtQuorum in " + config.getServerId() + " is " + logIndexAtQuorum + " highWaterMark is " + replicationState.getHighWaterMark());
  var currentHighWaterMark = replicationState.getHighWaterMark();
  if (logIndexAtQuorum > currentHighWaterMark) {
      applyLogAt(currentHighWaterMark, logIndexAtQuorum);
      logger.info("Setting highwatermark in " + config.getServerId() + " to " + logIndexAtQuorum);
      replicationState.setHighWaterMark(logIndexAtQuorum);
  } else {
      logger.info("HighWaterMark in " + config.getServerId() + " is " + replicationState.getHighWaterMark() + " >= " + logIndexAtQuorum);
  }

The high-water mark can be calculated by looking at the log indexes of all the followers and the log of the leader itself, and picking up the index which is available on the majority of the servers.

class ReplicationModule…

  Long computeHighwaterMark(List<Long> serverLogIndexes, int noOfServers) {
      serverLogIndexes.sort(Long::compareTo);
      return serverLogIndexes.get(noOfServers / 2);
  }

The leader propagates the high-water mark to the followers either as part of the regular HeartBeat or as separate requests. The followers then set their high-water mark accordingly.

Any client can read the log entries only till the high-water mark. Log entries beyond the high-water mark are not visible to clients as there is no confirmation that the entries are replicated, and so they might not be available if the leader fails, and some other server is elected as a leader.

class ReplicationModule…

  public WALEntry readEntry(long index) {
      if (index > replicationState.getHighWaterMark()) {
          throw new IllegalArgumentException("Log entry not available");
      }
      return wal.readAt(index);
  }

Log Truncation

When a server joins the cluster after crash/restart, there is always a possibility of having some conflicting entries in its log. So whenever a server joins the cluster, it checks with the leader of the cluster to know which entries in the log are potentially conflicting. It then truncates the log to the point where entries match with the leader,and then updates the log with the subsequent entries to ensure its log matches the rest of the cluster.

Consider the following example. The client sends requests to add four entries in the log. The leader successfully replicates three entries, but fails after adding entry4 to its own log. One of the followers is elected as a new leader and accepts more entries from the client. When the failed leader joins the cluster again, it has entry4 which is conflicting. So it needs to truncate its log till entry3, and then add entry5 to match the log with the rest of the cluster.

Figure 2: Leader Failure

Figure 3: New Leader

Figure 4: Log Truncation

Any server which restarts or rejoins the cluster after a pause, finds the new leader. It then explicitly asks for the current high-water mark, truncates its log to high-water mark, and then gets all the entries beyond high-water mark from the leader. Replication algorithms like RAFT have ways to find out conflicting entries by checking log entries in its own log with the log entries in the request. The entries with the same log index, but at lower Generation Clock, are removed.

class ReplicationModule…

  private void maybeTruncate(ReplicationRequest replicationRequest) throws IOException {
      if (replicationRequest.hasNoEntries() || wal.isEmpty()) {
          return;
      }

      List<WALEntry> entries = replicationRequest.getEntries();
      for (WALEntry entry : entries) {
          if (wal.getLastLogEntryId() >= entry.getEntryId()) {
              if (entry.getGeneration() == wal.readAt(entry.getEntryId()).getGeneration()) {
                  continue;
              }
              wal.truncate(entry.getEntryId());
          }
      }
  }
A simple implementation to support log truncation is to keep a map of log indexes and file position. Then the log can be truncated at a given index, as following:

class WALSegment…

  public void truncate(Long logIndex) throws IOException {
      var filePosition = entryOffsets.get(logIndex);
      if (filePosition == null) throw new IllegalArgumentException("No file position available for logIndex=" + logIndex);

      fileChannel.truncate(filePosition);
      readAll();
  }

Examples

  • All the consensus algorithms use the concept of high-water mark to know when to apply the proposed state mutations. e.g. In the RAFT consensus algorithm, high-water mark is called 'CommitIndex'.
  • In Kafka replication protocol, there is a separate index maintained called 'high-water mark'. Consumers can see entries only until the high-water mark.
  • Apache BookKeeper has a concept of 'last add confirmed', which is the entry which is successfully replicated on quorum of bookies.
Significant Revisions