Idempotent Receiver

Identify requests from clients uniquely so they can ignore duplicate requests when client retries

26 January 2021

Unmesh Joshi

Problem

Clients send requests to servers but might not get a response. It's impossible for clients to know if the response was lost or the server crashed before processing the request. To make sure that the request is processed, the client has to re-send the request.

If the server had already processed the request and crashed after that servers will get duplicate requests from clients, when the client retries.

Solution

Identify a client uniquely by assigning a unique id to each client. Before sending any requests, the client registers itself with the server.

class ConsistentCoreClient…

  private void registerWithLeader() {
      RequestOrResponse request
              = new RequestOrResponse(RequestId.RegisterClientRequest.getId(),
              correlationId.incrementAndGet());

      //blockingSend will attempt to create a new connection if there is a network error.
      RequestOrResponse response = blockingSend(request);
      RegisterClientResponse registerClientResponse
              = JsonSerDes.deserialize(response.getMessageBodyJson(),
              RegisterClientResponse.class);
      this.clientId = registerClientResponse.getClientId();
  }

When the server receives a client registration request, it assigns a unique id to the client. If the server is a Consistent Core, it can assign the Write-Ahead Log index as a client identifier.

class ReplicatedKVStore…

  private Map<Long, Session> clientSessions = new ConcurrentHashMap<>();

  private RegisterClientResponse registerClient(WALEntry walEntry) {

      Long clientId = walEntry.getEntryId();
      //clientId to store client responses.
      clientSessions.put(clientId, new Session(clock.nanoTime()));

      return new RegisterClientResponse(clientId);

  }

The server creates a session to store responses for the requests for the registered client. It also tracks the time at which the session is created, so that inactive sessions can be discarded as explained in later sections.

public class Session {
    long lastAccessTimestamp;
    Queue<Response> clientResponses = new ArrayDeque<>();

    public Session(long lastAccessTimestamp) {
        this.lastAccessTimestamp = lastAccessTimestamp;
    }

    public long getLastAccessTimestamp() {
        return lastAccessTimestamp;
    }

    public Optional<Response> getResponse(int requestNumber) {
        return clientResponses.stream().
                filter(r -> requestNumber == r.getRequestNumber()).findFirst();

    }

    private static final int MAX_SAVED_RESPONSES = 5;

    public void addResponse(Response response) {
        if (clientResponses.size() == MAX_SAVED_RESPONSES) {
            clientResponses.remove(); //remove the oldest request
        }
        clientResponses.add(response);
    }

    public void refresh(long nanoTime) {
        this.lastAccessTimestamp = nanoTime;
    }
}

For a Consistent Core, the client registration request is also replicated as part of the consensus algorithm. So the client registration is available even if the existing leader fails. The server then also stores responses sent to the client for subsequent requests.

For every non-idempotent request (see sidebar) that the server receives, it stores the response in the client session after successful execution.

class ReplicatedKVStore…

  private Response applyRegisterLeaseCommand(WALEntry walEntry, RegisterLeaseCommand command) {
      logger.info("Creating lease with id " + command.getName()
              + "with timeout " + command.getTimeout()
              + " on server " + getServer().getServerId());
      try {
          leaseTracker.addLease(command.getName(),
                  command.getTimeout());
          Response success = Response.success(walEntry.getEntryId());
          if (command.hasClientId()) {
              Session session = clientSessions.get(command.getClientId());
              session.addResponse(success.withRequestNumber(command.getRequestNumber()));
          }
          return success;

      } catch (DuplicateLeaseException e) {
          return Response.error(1, e.getMessage(), walEntry.getEntryId());
      }
  }

The client sends the client identifier with each request that is sent to the server. The client also keeps a counter to assign request numbers to each request sent to the server.

class ConsistentCoreClient…

  int nextRequestNumber = 1;

  public void registerLease(String name, long ttl) {
      RegisterLeaseRequest registerLeaseRequest
              = new RegisterLeaseRequest(clientId, nextRequestNumber, name, ttl);

      nextRequestNumber++; //increment request number for next request.

      var serializedRequest = serialize(registerLeaseRequest);

      logger.info("Sending RegisterLeaseRequest for " + name);
      blockingSendWithRetries(serializedRequest);

  }

  private static final int MAX_RETRIES = 3;

  private RequestOrResponse blockingSendWithRetries(RequestOrResponse request) {
      for (int i = 0; i <= MAX_RETRIES; i++) {
          try {
              //blockingSend will attempt to create a new connection is there is no connection.
              return blockingSend(request);

          } catch (NetworkException e) {
              resetConnectionToLeader();
              logger.error("Failed sending request  " + request + ". Try " + i, e);
          }
      }

      throw new NetworkException("Timed out after " + MAX_RETRIES + " retries");
  }

When the server receives a request, it checks if the request with the given request number from the same client is already processed. If it finds the saved response, it returns the same response to the client, without processing the request again.

class ReplicatedKVStore…

  private Response applyWalEntry(WALEntry walEntry) {
      Command command = deserialize(walEntry);
      if (command.hasClientId()) {
          Session session = clientSessions.get(command.getClientId());
          Optional<Response> savedResponse = session.getResponse(command.getRequestNumber());
          if(savedResponse.isPresent()) {
              return savedResponse.get();
          } //else continue and execute this command.
      }

Expiring the saved client requests

The requests stored per client cannot be stored forever. There are multiple ways the requests can be expired. In the reference implementation for Raft, the client keeps a separate number to note the request number for which the response is successfully received. This number is then sent with each request to the server. The server can safely discard any requests with request number less than this number.

If a client is guaranteed to send the next request only after receiving the response for the previous request, the server can safely remove all previous requests once it gets a new request from the client. There is a problem when Request Pipeline is used, as there can be multiple in-flight requests for which client might not have received the response. If the server knows the maximum number of in-flight requests a client can have, it can store only those many responses, and remove all the other responses. For example, [kafka] can have a maximum of five in-flight requests for its producer, so it stores a maximum of five previous responses.

class Session…

  private static final int MAX_SAVED_RESPONSES = 5;

  public void addResponse(Response response) {
      if (clientResponses.size() == MAX_SAVED_RESPONSES) {
          clientResponses.remove(); //remove the oldest request
      }
      clientResponses.add(response);
  }

Removing the registered clients

The client's session is not kept on the server forever. A server can have maximum time to live for the client sessions it stores. Clients send a HeartBeat periodically. If there are no HeartBeats from the client during this time to live, the client's state on the server can be removed.

The server starts a scheduled task to periodically check for expired sessions and remove the sessions which are expired.

class ReplicatedKVStore…

  private long heartBeatIntervalMs = TimeUnit.SECONDS.toMillis(10);
  private long sessionTimeoutNanos = TimeUnit.MINUTES.toNanos(5);

  private void startSessionCheckerTask() {
      scheduledTask = executor.scheduleWithFixedDelay(()->{
          removeExpiredSession();
      }, heartBeatIntervalMs, heartBeatIntervalMs, TimeUnit.MILLISECONDS);
  }
  private void removeExpiredSession() {
      long now = System.nanoTime();
      for (Long clientId : clientSessions.keySet()) {
          Session session = clientSessions.get(clientId);
          long elapsedNanosSinceLastAccess = now - session.getLastAccessTimestamp();
          if (elapsedNanosSinceLastAccess > sessionTimeoutNanos) {
              clientSessions.remove(clientId);
          }
      }
  }

Examples

Raft has reference implementation to have idempotency for providing linearizable actions.

Kafka allows Idempotent Producer which allows clients to retry requests and ignores duplicate requests.

Zookeeper has the concept of Sessions, and zxid, which allows clients to recover. Hbase has a [hbase-recoverable-zookeeper] wrapper, which implements idempotent actions following the guidelines of [zookeeper-error-handling]

Significant Revisions