Follower Reads

Serve read requests from followers to achieve better throughput and lower latency

01 July 2021

Unmesh Joshi

Problem

When using the Leader and Followers pattern, it's possible that the leader may get overloaded if too many requests are sent to it. Furthermore in a multi-datacenter setup, where the client is in a remote datacenter, requests to the leader will be subject to additional latency.

Solution

While the write requests need to go to the leader to maintain consistency, the read-only requests can instead go to the nearest follower. This is particularly useful when clients are mostly read-only.

It is important to note that clients reading from followers can get old values. There will always be a replication lag between the leader and the follower, even in the systems which implement consensus algorithms like Raft. That's because even when the leader knows about which values are committed, it needs another message to communicate it to the follower. So reading from follower servers is used only in situations where slightly older values are tolerated.

Figure 1: Reading from the nearest follower

Finding The Nearest Replica

Cluster nodes maintain additional metadata about their location.

class ReplicaDescriptor…

  public class ReplicaDescriptor {
      public ReplicaDescriptor(InetAddressAndPort address, String region) {
          this.address = address;
          this.region = region;
      }
      InetAddressAndPort address;
      String region;
  
      public InetAddressAndPort getAddress() {
          return address;
      }
  
      public String getRegion() {
          return region;
      }
  }

The cluster client can then pick up the local replica based its own region.

class ClusterClient…

  public List<String> get(String key) {
      List<ReplicaDescriptor> allReplicas = allFollowerReplicas(key);
      ReplicaDescriptor nearestFollower = findNearestFollowerBasedOnLocality(allReplicas, clientRegion);
      GetValueResponse getValueResponse = sendGetRequest(nearestFollower.getAddress(), new GetValueRequest(key));
      return getValueResponse.getValue();
  }

  ReplicaDescriptor findNearestFollowerBasedOnLocality(List<ReplicaDescriptor> followers, String clientRegion) {
      List<ReplicaDescriptor> sameRegionFollowers = matchLocality(followers, clientRegion);
      List<ReplicaDescriptor> finalList = sameRegionFollowers.isEmpty()?followers:sameRegionFollowers;
      return finalList.get(0);
  }

  private List<ReplicaDescriptor> matchLocality(List<ReplicaDescriptor> followers, String clientRegion) {
      return followers.stream().filter(rd -> clientRegion.equals(rd.region)).collect(Collectors.toList());
  }

For example, if there are two follower replicas, one in the region us-west and the other in the region us-east. The client from us-east region, will be connected to the us-east replica.

class CausalKVStoreTest…

  @Test
  public void getFollowersInSameRegion() {
      List<ReplicaDescriptor> followers = createReplicas("us-west", "us-east");
      ReplicaDescriptor nearestFollower = new ClusterClient(followers, "us-east").findNearestFollower(followers);
      assertEquals(nearestFollower.getRegion(), "us-east");

  }

The cluster client or a co-ordinating cluster node can also track latencies observed with cluster nodes. It can send period heartbeats to capture the latencies, and use that to pick up a follower with minimum latency. To do a more fair selection, products like [mongodb] or [cockroachdb] calculate latencies as a moving average. Cluster nodes generally maintain a Single Socket Channel to communicate with other cluster nodes. Single Socket Channel needs a HeartBeat to keep the connection alive. So capturing latencies and calculating the moving average can be easily implemented.

class WeightedAverage…

  public class WeightedAverage {
      long averageLatencyMs = 0;
      public void update(long heartbeatRequestLatency) {
          //Example implementation of weighted average as used in Mongodb
          //The running, weighted average round trip time for heartbeat messages to the target node.
          // Weighted 80% to the old round trip time, and 20% to the new round trip time.
          averageLatencyMs = averageLatencyMs == 0
                  ? heartbeatRequestLatency
                  : (averageLatencyMs * 4 + heartbeatRequestLatency) / 5;
      }
  
      public long getAverageLatency() {
          return averageLatencyMs;
      }
  }

class ClusterClient…

  private Map<InetAddressAndPort, WeightedAverage> latencyMap = new HashMap<>();
  private void sendHeartbeat(InetAddressAndPort clusterNodeAddress) {
      try {
          long startTimeNanos = System.nanoTime();
          sendHeartbeatRequest(clusterNodeAddress);
          long endTimeNanos = System.nanoTime();

          WeightedAverage heartbeatStats = latencyMap.get(clusterNodeAddress);
          if (heartbeatStats == null) {
              heartbeatStats = new WeightedAverage();
              latencyMap.put(clusterNodeAddress, new WeightedAverage());
          }
          heartbeatStats.update(endTimeNanos - startTimeNanos);

      } catch (NetworkException e) {
          logger.error(e);
      }
  }

This latency information can then be used to pick up the follower with the least network latency.

class ClusterClient…

  ReplicaDescriptor findNearestFollower(List<ReplicaDescriptor> allFollowers) {
      List<ReplicaDescriptor> sameRegionFollowers = matchLocality(allFollowers, clientRegion);
      List<ReplicaDescriptor> finalList
              = sameRegionFollowers.isEmpty() ? allFollowers
                                                :sameRegionFollowers;
      return finalList.stream().sorted((r1, r2) -> {
          if (!latenciesAvailableFor(r1, r2)) {
              return 0;
          }
          return Long.compare(latencyMap.get(r1).getAverageLatency(),
                              latencyMap.get(r2).getAverageLatency());

      }).findFirst().get();
  }

  private boolean latenciesAvailableFor(ReplicaDescriptor r1, ReplicaDescriptor r2) {
      return latencyMap.containsKey(r1) && latencyMap.containsKey(r2);
  }

Disconnected Or Slow Followers

A follower might get disconnected from the leader and stop getting updates. In some cases, followers can suffer with slow disks impeding the overall replication process, which causes the follower to lag behind the leader. Followers can track if it has not heard from the leader in a while, and stop serving user requests.

For example, products like [mongodb] allow selecting a replica with a maximum allowed lag time. If the replica lags behind the leader beyond this maximum time, it's not selected to serve the requests. In [kafka] if the follower detects the offset asked by the consumer is too large, it responds with OFFSET_OUT_OF_RANGE error. The consumer is then expected to communicate with the leader.

Read Your Own Writes

Reading from the follower servers can be problematic, as it can give surprising results in common scenarios where a client writes something and then immediately tries to read it.

Consider a client who notices that some book data erroneously has "title": "Nitroservices". It corrects this by a write, "title": "Microservices", which goes to the leader. It then immediately reads back the value but the read request goes to a follower, which may not have been updated yet.

Figure 2: Reading stale value from follower

This can be a common problem. For example, untill very recently Amazon S3 did not prevent this.

To fix this issue, with each write, the server stores not just the new value, but also a monotonically increasing version stamp. The stamp can be a High-Water Mark or a Hybrid Clock. The server returns this version stamp of the stored value in the response to the write request. Then, should the client wish to read the value later, it includes the version stamp as part of its read request. Should the read request go to a follower, it checks its stored value to see if it is equal or later than the requested version stamp. If it isn't, it waits until it has an up-to-date version before returning the value. By doing this clients will always read a value that's consistent with a value they write - which is often referred to as read-your-writes consistency.

The flow of requests happens as shown below: To correct a wrongly written value, "title": "Microservices" is written to the leader. The leader returns version 2, to the client in the response. When the client tries to read the value for "title", it passes the version number 2 in the request. The follower server which receives the request checks if its own version number is up-to-date. Because the version number at the follower server is still 1, it waits till it gets that version from the leader. Once it has the matching (or later) version, it completes the read request, and returns the value "Microservices".

Figure 3: Read your own writes at follower

The code for the key value store looks as follows. It is important to note that the follower can be lagging behind too much or be disconnected from the leader. So it does not wait indefinitely. There is a configured timeout value. If the follower server can not get the updates within timeout, an error response is returned to the client. The client can then retry reading from other followers.

class ReplicatedKVStore…

  Map<Integer, CompletableFuture> waitingRequests = new ConcurrentHashMap<>();
  public CompletableFuture<Optional<String>> get(String key, int atVersion) {
      if(this.server.serverRole() == ServerRole.FOLLOWING) {
          //check if we have the version with us;
          if (!isVersionUptoDate(atVersion)) {
              //wait till we get the latest version.
              CompletableFuture<Optional<String>> future = new CompletableFuture<>();
              //Timeout if version does not progress to required version
              //before followerWaitTimeout ms.
              future.orTimeout(config.getFollowerWaitTimeoutMs(), TimeUnit.MILLISECONDS);
              waitingRequests.put(atVersion, future);
              return future;
          }
      }
      return CompletableFuture.completedFuture(mvccStore.get(key, atVersion));
  }

  private boolean isVersionUptoDate(int atVersion) {
      Optional<Integer> maxVersion = mvccStore.getMaxVersion();
      return maxVersion.map(v -> v >= atVersion).orElse(false);
  }

Once the key value store progresses to the version the client requested, it can send the response to the client.

class ReplicatedKVStore…

  private Response applyWalEntry(WALEntry walEntry) {
      Command command = deserialize(walEntry);
      if (command instanceof SetValueCommand) {
          return applySetValueCommandsAndCompleteClientRequests((SetValueCommand) command);
      }
      throw new IllegalArgumentException("Unknown command type " + command);
  }

  private Response applySetValueCommandsAndCompleteClientRequests(SetValueCommand setValueCommand) {
      getLogger().info("Setting key value " + setValueCommand);
      version = version + 1;
      mvccStore.put(new VersionedKey(setValueCommand.getKey(), version), setValueCommand.getValue());
      completeWaitingFuturesIfFollower(version, setValueCommand.getValue());
      Response response = Response.success(version);
      return response;
  }

  private void completeWaitingFuturesIfFollower(int version, String value) {
      CompletableFuture completableFuture = waitingRequests.remove(version);
      if (completableFuture != null) {
          completableFuture.complete(Optional.of(value));
      }
  }

Linearizable Reads

Sometimes read requests need to get the latest available data. The replication lag cannot be tolerated. In these cases, the read requests need to be redirected to the leader. This is a common design issue tackled by the Consistent Core

Examples

[neo4j] allows causal clusters to be set up. Every write operation returns a bookmark, which can be passed when executing queries against read replicas. The bookmark ensures that the client will always get the values written at the bookmark

[mongodb] maintains causal consistency in its replica sets. The write operations return an operationTime; this is passed in the subsequent read requests to make sure read requests return the writes which happened before the read request.

[cockroachdb] allows clients to read from follower servers. The leader servers publish the latest timestamps at which the writes are completed on the leader, called closed timestamps. The followers allow reading the values if it has values at the closed timestamp.

Kafka allows consuming the messages from the follower brokers. The followers know about the High-Water Mark at the leader. In kafka's design, instead of waiting for the latest updates, the broker returns a OFFSET_NOT_AVAILABLE error to the consumers and expects consumers to retry.

Significant Revisions