This pattern is part of Patterns of Distributed Systems
Request Waiting List
Track client requests which require responses after the criteria to respond is met based on responses from other cluster nodes.
07 September 2022
Problem
A cluster node needs to communicate with other cluster nodes to replicate data while processing a client request. A response from all other cluster nodes or a Quorum is needed before responding to clients.
Communication to other cluster nodes is done asynchronously. Asynchronous communication allows patterns like Request Pipeline and Request Batch to be used.
So the cluster node receives and processes responses from multiple other cluster nodes asynchronously. It then needs to correlate them to check if the Quorum for a particular client request is reached.
Solution
The cluster node maintains a waiting list which maps a key and a callback function. The key is chosen depending on the specific criteria to invoke the callback. For example if it needs to be invoked whenever a message from other cluster node is received, it can be the Correlation Identifer of the message. In the case of Replicated Log it is the High-Water Mark. The callback handles the response and decides if the client request can be fulfilled.
Consider the example of a key-value store where, data is replicated on multiple servers. Here, Quorum can be used to decide when a replication can be considered successful to initiate a response to the client. The cluster node then tracks the requests sent to other cluster nodes, and a callback is registered with each request. Each request is marked with a Correlation Identifer, which is used to map response to the request. The waiting list is then notified to invoke the callback when the response from other cluster nodes are received.
For the sake of this example, let's call our three cluster nodes athens, byzantium and cyrene. The client connects with athens to store "title" as "Microservices". Athens replicates it on byzantium and cyrene; so it sends a request to itself to store the key-value and sends requests to both byzantium and cyrene concurrently. To track responses, athens creates a WriteQuorumResponseCallback and adds it to the waiting list for each of the requests sent.

For every response received, the WriteQuorumResponseCallback is invoked to handle the response. It checks whether the required number of responses have been received. Once the response is received from byzantium, the quorum is reached and the pending client request is completed. Cyrene can respond later, but the response can be sent to the client without waiting for it.

The code looks like the sample below: Note that every cluster node maintains its own instance of a waiting list. The waiting list tracks the key and associated callback and stores the timestamp at which the callback was registered. The timestamp is used to check whether the callbacks need to be expired if responses haven't been received within the expected time.
public class RequestWaitingList<Key, Response> { private Map<Key, CallbackDetails> pendingRequests = new ConcurrentHashMap<>(); public void add(Key key, RequestCallback<Response> callback) { pendingRequests.put(key, new CallbackDetails(callback, clock.nanoTime())); }
class CallbackDetails { RequestCallback requestCallback; long createTime; public CallbackDetails(RequestCallback requestCallback, long createTime) { this.requestCallback = requestCallback; this.createTime = createTime; } public RequestCallback getRequestCallback() { return requestCallback; } public long elapsedTime(long now) { return now - createTime; } }
public interface RequestCallback<T> { void onResponse(T r); void onError(Throwable e); }
It is asked to handle the response or error once the response has been received from the other cluster node.
class RequestWaitingList…
public void handleResponse(Key key, Response response) { if (!pendingRequests.containsKey(key)) { return; } CallbackDetails callbackDetails = pendingRequests.remove(key); callbackDetails.getRequestCallback().onResponse(response); }
class RequestWaitingList…
public void handleError(int requestId, Throwable e) { CallbackDetails callbackDetails = pendingRequests.remove(requestId); callbackDetails.getRequestCallback().onError(e); }
The waiting list can then be used to handle quorum responses with the implementation looking something like this:
static class WriteQuorumCallback implements RequestCallback<RequestOrResponse> { private final int quorum; private volatile int expectedNumberOfResponses; private volatile int receivedResponses; private volatile int receivedErrors; private volatile boolean done; private final RequestOrResponse request; private final ClientConnection clientConnection; public WriteQuorumCallback(int totalExpectedResponses, RequestOrResponse clientRequest, ClientConnection clientConnection) { this.expectedNumberOfResponses = totalExpectedResponses; this.quorum = expectedNumberOfResponses / 2 + 1; this.request = clientRequest; this.clientConnection = clientConnection; } @Override public void onResponse(RequestOrResponse response) { receivedResponses++; if (receivedResponses == quorum && !done) { respondToClient("Success"); done = true; } } @Override public void onError(Throwable t) { receivedErrors++; if (receivedErrors == quorum && !done) { respondToClient("Error"); done = true; } } private void respondToClient(String response) { clientConnection.write(new RequestOrResponse(RequestId.SetValueResponse.getId(), response.getBytes(), request.getCorrelationId())); } }
Whenever a cluster node sends requests to other nodes, it adds a callback to the waiting list mapping with the Correlation Identifer of the request sent.
class ClusterNode…
private void handleSetValueClientRequestRequiringQuorum(List<InetAddressAndPort> replicas, RequestOrResponse request, ClientConnection clientConnection) { int totalExpectedResponses = replicas.size(); RequestCallback requestCallback = new WriteQuorumCallback(totalExpectedResponses, request, clientConnection); for (InetAddressAndPort replica : replicas) { int correlationId = nextRequestId(); requestWaitingList.add(correlationId, requestCallback); try { SocketClient client = new SocketClient(replica); client.sendOneway(new RequestOrResponse(RequestId.SetValueRequest.getId(), request.getMessageBodyJson(), correlationId, listenAddress)); } catch (IOException e) { requestWaitingList.handleError(correlationId, e); } } }
Once the response is received, the waiting list is asked to handle it:
class ClusterNode…
private void handleSetValueResponse(RequestOrResponse response) { requestWaitingList.handleResponse(response.getCorrelationId(), response); }
The waiting list will then invoke the associated WriteQuorumCallback. The WriteQuorumCallback instance verifies if the quorum responses have been received and invokes the callback to respond to the client.
Expiring Long Pending Requests
Sometimes, responses from the other cluster nodes are delayed. In these instances the waiting list generally has a mechanism to expire requests after a timeout:
class RequestWaitingList…
private SystemClock clock; private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); private long expirationIntervalMillis = 2000; public RequestWaitingList(SystemClock clock) { this.clock = clock; executor.scheduleWithFixedDelay(this::expire, expirationIntervalMillis, expirationIntervalMillis, MILLISECONDS); } private void expire() { long now = clock.nanoTime(); List<Key> expiredRequestKeys = getExpiredRequestKeys(now); expiredRequestKeys.stream().forEach(expiredRequestKey -> { CallbackDetails request = pendingRequests.remove(expiredRequestKey); request.requestCallback.onError(new TimeoutException("Request expired")); }); } private List<Key> getExpiredRequestKeys(long now) { return pendingRequests.entrySet().stream().filter(entry -> entry.getValue().elapsedTime(now) > expirationIntervalMillis).map(e -> e.getKey()).collect(Collectors.toList()); }
Examples
[cassandra] uses asynchronous message passing for internode communication. It uses Quorum and processes response messages asynchronously the same way.
Kafka tracks the pending requests using a data structure called [kafka-purgatory].
[etcd] maintains a wait list to respond to client requests in a similar way.
This page is part of:
Patterns of Distributed Systems

Patterns
- Clock-Bound Wait
- Consistent Core
- Emergent Leader
- Fixed Partitions
- Follower Reads
- Generation Clock
- Gossip Dissemination
- HeartBeat
- High-Water Mark
- Hybrid Clock
- Idempotent Receiver
- Key-Range Partitions
- Lamport Clock
- Leader and Followers
- Lease
- Low-Water Mark
- Paxos
- Quorum
- Replicated Log
- Request Batch
- Request Pipeline
- Request Waiting List
- Segmented Log
- Single Socket Channel
- Singular Update Queue
- State Watch
- Two Phase Commit
- Version Vector
- Versioned Value
- Write-Ahead Log
Significant Revisions
07 September 2022: