This pattern is part of Patterns of Distributed Systems

Single Socket Channel

Maintain order of the requests sent to a server by using a single TCP connection.

19 August 2020

Problem

When we are using Leader and Followers, we need to ensure that messages between the leader and each follower are kept in order, with a retry mechanism for any lost messages. We need to do this while keeping the cost of new connections low, so that opening new connections doesn't increase the system's latency.

Solution

Fortunately, the long-used and widely available TCP mechanism provides all these necessary characteristics. Thus we can get the communication we need by ensuring all communication between a follower and its leader goes through a single socket channel. The follower then serializes the updates from leader using a Singular Update Queue

Single Socket Channel

Nodes never close the connection once it is open and continuously read it for new requests. Nodes use a dedicated thread per connection to read and write requests. A thread per connection isn't needed if non blocking io is used.

A simple-thread based implementation will be like the following:

class SocketHandlerThread…

  @Override
  public void run() {
      isRunning = true;
      try {
          //Continues to read/write to the socket connection till it is closed.
          while (isRunning) {
              handleRequest();
          }
      } catch (Exception e) {
          getLogger().debug(e);
          closeClient(this);
      }
  }

  private void handleRequest() {
      RequestOrResponse request = clientConnection.readRequest();
      RequestId requestId = RequestId.valueOf(request.getRequestId());
      server.accept(new Message<>(request, requestId, clientConnection));
  }

  public void closeConnection() {
      clientConnection.close();
  }

The node reads requests and submits them to a Singular Update Queue for processing. Once the node has processed the request it writes the response back to the socket.

Whenever a node establishes a communication it opens a single socket connection that's used for all requests with the other party.

class SingleSocketChannel…

  public class SingleSocketChannel implements Closeable {
      final InetAddressAndPort address;
      final int heartbeatIntervalMs;
      private Socket clientSocket;
      private final OutputStream socketOutputStream;
      private final InputStream inputStream;
  
      public SingleSocketChannel(InetAddressAndPort address, int heartbeatIntervalMs) throws IOException {
          this.address = address;
          this.heartbeatIntervalMs = heartbeatIntervalMs;
          clientSocket = new Socket();
          clientSocket.connect(new InetSocketAddress(address.getAddress(), address.getPort()), heartbeatIntervalMs);
          clientSocket.setSoTimeout(heartbeatIntervalMs * 10); //set socket read timeout to be more than heartbeat.
          socketOutputStream = clientSocket.getOutputStream();
          inputStream = clientSocket.getInputStream();
      }
  
      public synchronized RequestOrResponse blockingSend(RequestOrResponse request) throws IOException {
          writeRequest(request);
          byte[] responseBytes = readResponse();
          return deserialize(responseBytes);
      }
  
      private void writeRequest(RequestOrResponse request) throws IOException {
          var dataStream = new DataOutputStream(socketOutputStream);
          byte[] messageBytes = serialize(request);
          dataStream.writeInt(messageBytes.length);
          dataStream.write(messageBytes);
      }

It's important to keep a timeout on the connection so it doesn't block indefinitely in case of errors. We use HeartBeat to send requests periodically over the socket channel to keep it alive. This timeout is generally kept as a multiple of the HeartBeat interval, to allow for network round trip time and some possible network delays. It's reasonable to keep the connection timeout as say 10 times that of the HeartBeat interval.

class SocketListener…

  private void setReadTimeout(Socket clientSocket) throws SocketException {
      clientSocket.setSoTimeout(config.getHeartBeatIntervalMs() * 10);
  }

Sending requests over a single channel can create a problem with head of line blocking issues. To avoid these, we can use a Request Pipeline.

Examples

  • Zookeeper uses a single socket channel and a thread per follower to do all the communication.
  • Kafka uses a single socket channel between follower and leader partitions to replicate messages.
  • Reference implementation of the Raft consensus algorithm, LogCabin uses single socket channel to communicate between leader and followers
Significant Revisions

19 August 2020: Published