This pattern is part of Patterns of Distributed Systems
Request Batch
Combine multiple requests to optimally utilise the network.
06 September 2022
Contents
Problem
When requests are sent to cluster nodes, if a lot of requests are sent with a small amount of data, network latency and the request processing time (including serialization, deserialization of the request on the server side) can add significant overhead.
For example, if a network's capacity is 1gbps and its latency and request processing time is, say, 100 microseconds, if the client is sending hundreds of requests at the same time — each one just a few bytes — it will significantly limit the overall throughput if each request needs 100 microseconds to complete.
Solution
Combine multiple requests together into a single request batch. The batch of the request will be sent to the cluster node for processing. with each request processed in exactly the same manner as an individual request. It will then respond with the batch of the responses.
As an example, consider a distributed key-value store, where the client sends requests to store multiple key-values on the server. When the client receives a call to send the request, it does not immediately send it over the network; instead, it keeps a queue of requests to be sent.
class Client…
LinkedBlockingQueue<RequestEntry> requests = new LinkedBlockingQueue<>(); public CompletableFuture send(SetValueRequest setValueRequest) { int requestId = enqueueRequest(setValueRequest); CompletableFuture responseFuture = trackPendingRequest(requestId); return responseFuture; } private int enqueueRequest(SetValueRequest setValueRequest) { int requestId = nextRequestId(); byte[] requestBytes = serialize(setValueRequest, requestId); requests.add(new RequestEntry(requestBytes, clock.nanoTime())); return requestId; } private int nextRequestId() { return requestNumber++; }
The time at which the request is enqued is tracked; this is later used to decide if the request can be sent as part of the batch.
class RequestEntry…
class RequestEntry { byte[] serializedRequest; long createdTime; public RequestEntry(byte[] serializedRequest, long createdTime) { this.serializedRequest = serializedRequest; this.createdTime = createdTime; }
It then tracks the pending requests to be completed when a response is received. Each request will be assigned a unique request number which can be used to map the response and complete the requests.
class Client…
Map<Integer, CompletableFuture> pendingRequests = new ConcurrentHashMap<>(); private CompletableFuture trackPendingRequest(Integer correlationId) { CompletableFuture responseFuture = new CompletableFuture(); pendingRequests.put(correlationId, responseFuture); return responseFuture; }
The client starts a separate task which continuously tracks the queued requests.
class Client…
public Client(Config config, InetAddressAndPort serverAddress, SystemClock clock) { this.clock = clock; this.sender = new Sender(config, serverAddress, clock); this.sender.start(); }
class Sender…
@Override public void run() { while (isRunning) { boolean maxWaitTimeElapsed = requestsWaitedFor(config.getMaxBatchWaitTime()); boolean maxBatchSizeReached = maxBatchSizeReached(requests); if (maxWaitTimeElapsed || maxBatchSizeReached) { RequestBatch batch = createBatch(requests); try { BatchResponse batchResponse = sendBatchRequest(batch, address); handleResponse(batchResponse); } catch (IOException e) { batch.getPackedRequests().stream().forEach(r -> { pendingRequests.get(r.getCorrelationId()).completeExceptionally(e); }); } } } } private RequestBatch createBatch(LinkedBlockingQueue<RequestEntry> requests) { RequestBatch batch = new RequestBatch(MAX_BATCH_SIZE_BYTES); RequestEntry entry = requests.peek(); while (entry != null && batch.hasSpaceFor(entry.getRequest())) { batch.add(entry.getRequest()); requests.remove(entry); entry = requests.peek(); } return batch; }
class RequestBatch…
public boolean hasSpaceFor(byte[] requestBytes) { return batchSize() + requestBytes.length <= maxSize; } private int batchSize() { return requests.stream().map(r->r.length).reduce(0, Integer::sum); }
There are two checks which are generally done.
- If enough requests have accumulated to fill the batch to the maximum configured size.
class Sender…
private boolean maxBatchSizeReached(Queue<RequestEntry> requests) { return accumulatedRequestSize(requests) > MAX_BATCH_SIZE_BYTES; } private int accumulatedRequestSize(Queue<RequestEntry> requests) { return requests.stream().map(re -> re.size()).reduce((r1, r2) -> r1 + r2).orElse(0); }
class Sender…
private boolean requestsWaitedFor(long batchingWindowInMs) { RequestEntry oldestPendingRequest = requests.peek(); if (oldestPendingRequest == null) { return false; } long oldestEntryWaitTime = clock.nanoTime() - oldestPendingRequest.createdTime; return oldestEntryWaitTime > batchingWindowInMs; }
Once any of these conditions has been fulfilled the batch request can then be sent to the server. The server unpacks the batch request, and processes each of the individual requests.
class Server…
private void handleBatchRequest(RequestOrResponse batchRequest, ClientConnection clientConnection) { RequestBatch batch = JsonSerDes.deserialize(batchRequest.getMessageBodyJson(), RequestBatch.class); List<RequestOrResponse> requests = batch.getPackedRequests(); List<RequestOrResponse> responses = new ArrayList<>(); for (RequestOrResponse request : requests) { RequestOrResponse response = handleSetValueRequest(request); responses.add(response); } sendResponse(batchRequest, clientConnection, new BatchResponse(responses)); } private RequestOrResponse handleSetValueRequest(RequestOrResponse request) { SetValueRequest setValueRequest = JsonSerDes.deserialize(request.getMessageBodyJson(), SetValueRequest.class); kv.put(setValueRequest.getKey(), setValueRequest.getValue()); RequestOrResponse response = new RequestOrResponse(RequestId.SetValueResponse.getId(), "Success".getBytes(), request.getCorrelationId()); return response; }
The client receives the batch response and completes all the pending requests.
class Sender…
private void handleResponse(BatchResponse batchResponse) { List<RequestOrResponse> responseList = batchResponse.getResponseList(); logger.debug("Completing requests from " + responseList.get(0).getCorrelationId() + " to " + responseList.get(responseList.size() - 1).getCorrelationId()); responseList.stream().forEach(r -> { CompletableFuture completableFuture = pendingRequests.remove(r.getCorrelationId()); if (completableFuture != null) { completableFuture.complete(r); } else { logger.error("no pending request for " + r.getCorrelationId()); } }); }
Technical Considerations
The batch size should be chosen based on the size of individual messages and available network bandwidth as well as the observed latency and throughput improvements based on the real life load. These are configured to some sensible defaults assuming smaller message sizes and the optimal batch size for server side processing. For example, Kafka has a default batch size of 16Kb. It also has a configuration parameter called "linger.ms" with the default value of 0. However if the size of the messages are bigger a higher batch size might work better.
Having too large a batch size will likely only offer diminishing returns. For example having a batch size in MBs can add further overheads in terms of processing. This is why the batch size parameter is typically tuned according to observations made through performance testing.
A request batch is generally used along with Request Pipeline to improve overall throughput and latency.
When the retry-backoff policy is used to send requests to cluster nodes, the entire batch request will be retried. The cluster node might have processed part of the batch already; so to ensure the retry works without any issues, you should implement Idempotent Receiver.
Examples
Kafka supports the batch of the producer requests.
Batching is also used when saving data to disk. For example [bookkeeper] implements the batching in a similar way to flush the log to the disk.
Nagel's Algorithm is used in TCP to batch multiple smaller packets together to improve overall network throughput.
This page is part of:
Patterns of Distributed Systems

Patterns
- Clock-Bound Wait
- Consistent Core
- Emergent Leader
- Fixed Partitions
- Follower Reads
- Generation Clock
- Gossip Dissemination
- HeartBeat
- High-Water Mark
- Hybrid Clock
- Idempotent Receiver
- Key-Range Partitions
- Lamport Clock
- Leader and Followers
- Lease
- Low-Water Mark
- Paxos
- Quorum
- Replicated Log
- Request Batch
- Request Pipeline
- Request Waiting List
- Segmented Log
- Single Socket Channel
- Singular Update Queue
- State Watch
- Two Phase Commit
- Version Vector
- Versioned Value
- Write-Ahead Log
Significant Revisions
06 September 2022: