Request Pipeline

Improve latency by sending multiple requests on the connection without waiting for the response of the previous requests.

20 August 2020

Problem

Communicating between servers within a cluster using Single Socket Channel can cause performance issues if requests need to wait for responses for previous requests to be returned. To achieve better throughput and latency, the request queue on the server should be filled enough to make sure server capacity is fully utilized. For example, when Singular Update Queue is used within a server, it can always accept more requests until the queue fills up, while it's processing a request. If only one request is sent at a time, most of the server capacity is unnecessarily wasted.

Solution

Nodes send requests to other nodes without waiting for responses from previous requests. This is achieved by creating two separate threads, one for sending requests over a network channel and one for receiving responses from the network channel.

Figure 1: Request Pipeline

The sender node sends the requests over the socket channel, without waiting for response.

class SingleSocketChannel…

  public void sendOneWay(RequestOrResponse request) throws IOException {
      var dataStream = new DataOutputStream(socketOutputStream);
      byte[] messageBytes = serialize(request);
      dataStream.writeInt(messageBytes.length);
      dataStream.write(messageBytes);
  }

A separate thread is started to read responses.

class ResponseThread…

  class ResponseThread extends Thread implements Logging {
      private volatile boolean isRunning = false;
      private SingleSocketChannel socketChannel;

      public ResponseThread(SingleSocketChannel socketChannel) {
          this.socketChannel = socketChannel;
      }

      @Override
      public void run() {
          try {
              isRunning = true;
              logger.info("Starting responder thread = " + isRunning);
              while (isRunning) {
                  doWork();
              }

          } catch (IOException e) {
              getLogger().error(e); //thread exits if stopped or there is IO error
          }
      }

      public void doWork() throws IOException {
          RequestOrResponse response = socketChannel.read();
          logger.info("Read Response = " + response);
          processResponse(response);
      }

The response handler can immediately process the response or submits it to a Singular Update Queue

There are two issues with the request pipeline which need to be handled.

If requests are continuously sent without waiting for the response, the node accepting the request can be overwhelmed. For this reason, there is an upper limit on how many requests can be kept inflight at a time. Any node can send up to the maximum number of requests to other nodes. Once the maximum inflight requests are sent without receiving the response, no more requests are accepted and the sender is blocked. A very simple strategy to limit maximum inflight requests is to keep a blocking queue to keep track of requests. The queue is initialized with the number of requests which can be in flight. Once the response is received for a request, it's removed from the queue to create room for more requests. As shown in the below code, the maximum of five inflight requests are accepted per socket connection.

class RequestLimitingPipelinedConnection…

  private final Map<InetAddressAndPort, ArrayBlockingQueue<RequestOrResponse>> inflightRequests = new ConcurrentHashMap<>();
  private int maxInflightRequests = 5;
  public void send(InetAddressAndPort to, RequestOrResponse request) throws InterruptedException {
      ArrayBlockingQueue<RequestOrResponse> requestsForAddress = inflightRequests.get(to);
      if (requestsForAddress == null) {
          requestsForAddress = new ArrayBlockingQueue<>(maxInflightRequests);
          inflightRequests.put(to, requestsForAddress);
      }
      requestsForAddress.put(request);

The request is removed from the inflight request queue once the response is received.

class RequestLimitingPipelinedConnection…

  private void consume(SocketRequestOrResponse response) {
      Integer correlationId = response.getRequest().getCorrelationId();
      Queue<RequestOrResponse> requestsForAddress = inflightRequests.get(response.getAddress());
      RequestOrResponse first = requestsForAddress.peek();
      if (correlationId != first.getCorrelationId()) {
          throw new RuntimeException("First response should be for the first request");
      }
      requestsForAddress.remove(first);
      responseConsumer.accept(response.getRequest());
  }

Handling failures and also maintaining ordering guarantees becomes tricky to implement. Let's say there are two requests in flight. The first request failed and retried, the server might have processed the second request before the retried first request reaches the server. Servers need some mechanism to make sure out of order requests are rejected. Otherwise, there's always a risk of messages getting re-ordered in case of failures and retries. For example, Raft always sends the previous log index that is expected with every log entry. If the previous log index does not match, the server rejects the request. Kafka can allow max.in.flight.requests.per.connection to be more than one, with the idempotent producer implementation, which assigns a unique identifier to each message batch that is sent to the broker. The broker can then check the sequence number of the incoming request and reject the request if the requests are out of order.

Examples

All consensus algorithm such as Zab and Raft allow request pipeline support.

Kafka encourages clients to use request pipelining to improve throughput.

Significant Revisions