Consistent Core

Maintain a smaller cluster providing stronger consistency to allow large data cluster to coordinate server activities without implementing quorum based algorithms.

05 January 2021

Problem

When a cluster needs to handle a lot of data, it needs more and more servers. For a cluster of servers, there are some common requirements, such as selecting a specific server to be the master for a particular task, managing group membership information, mapping of data partitions to the servers etc. These requirements need strong consistency guarantee, namely linearizability. The implementation also needs to be fault tolerant. A common approach is to use a fault-tolerant consensus algorithms based on Quorum. But in quorum-based systems throughput degrades with the size of the cluster.

Solution

Implement a smaller, 3 to 5 node cluster which provides linearizability guarantee as well as fault tolerance. [1] A separate data cluster can use the small consistent cluster to manage metadata and for taking cluster wide decisions with primitives like Lease. This way, the data cluster can grow to a large number of servers, but can still do certain actions which need strong consistency guarantees using the smaller metadata cluster.

Figure 1: Consistent Core

A typical interface of consistent core looks like this:

public interface ConsistentCore {
    CompletableFuture put(String key, String value);

    List<String> get(String keyPrefix);

    CompletableFuture registerLease(String name, long ttl);

    void refreshLease(String name);

    void watch(String name, Consumer<WatchEvent> watchCallback);
}

At the minimum, Consistent Core provides a simple key value storage mechanism. It is used to store metadata.

Metadata Storage

The storage is implemented using consensus algorithms such as Raft. It is an example of Replicated Write Ahead Log implementation, where replication is handled by Leader and Followers and High-Water Mark is used to track the successful replication using Quorum

Supporting hierarchical storage

Consistent Core is generally used to store data for things like: group membership or task distribution across servers. A common usage pattern is to scope the type of metadata with a prefix. e.g. for group membership, the keys will all be stored like /servers/1, servers/2 etc. For tasks assigned to servers the keys can be /tasks/task1, /tasks/task2. This data is generally read with all the keys with a specific prefix. For example, to get information about all the servers in the cluster, all the keys with prefix /servers are read.

An example usage is as following:

The servers can register themselves with the Consistent Core by creating their own key with prefix /servers.

client1.setValue("/servers/1", "{address:192.168.199.10, port:8000}");

client2.setValue("/servers/2", "{address:192.168.199.11, port:8000}");

client3.setValue("/servers/3", "{address:192.168.199.12, port:8000}");

The clients can then get to know about all the servers in the cluster by reading with key prefix /servers as following:

assertEquals(client1.getValue("/servers"), Arrays.asList("{address:192.168.199.12, port:8000}",
                                                            "{address:192.168.199.11, port:8000}",
                                                            "{address:192.168.199.10, port:8000}"));

Because of this hierarchical nature of data storage, products like [zookeeper], [chubby] provide a file system like interface, where users create directories and files, or nodes, with the concept of parent and child nodes. [etcd3] has a flat key space with the ability to get a range of keys.

Handling Client Interactions

One of the key requirements for Consistent Core functionality is how a client interacts with the core. The following aspects are critical for the clients to work with the Consistent Core.

Finding the leader

It's important that all the operations are executed on the leader, so a client library needs to find the leader server first. There are two approaches possible to fulfil this requirement.

  • The follower servers in the consistent core know about the current leader, so if the client connects to a follower, it can return the address of the leader. The client can then directly connect to the leader identified in the response. It should be noted that the servers might be in the middle of leader election when the client tries to connect. In that case, servers cannot return the leader address and the client needs to wait and try another server.
  • Servers can implement a forwarding mechanism and forward all the client requests to the leader. This allows clients to connect to any server. Again, if servers are in the middle of leader election, then clients need to retry until the leader election is successful and a legitimate leader is established.
  • Products like zookeeper and etcd implement this approach because they allow some read-only requests to be handled by the follower servers; this avoids a bottleneck on the leader when a large number of clients are read-only. This reduces complexity in the clients to connect to either leader or follower based on the type of the request.

A simple mechanism to find the leader is to try to connect to each server and try to send a request, the server responds with a redirect response if it's not the leader.

private void establishConnectionToLeader(List<InetAddressAndPort> servers) {
    for (InetAddressAndPort server : servers) {
        try {
            SingleSocketChannel socketChannel = new SingleSocketChannel(server, 10);
            logger.info("Trying to connect to " + server);
            RequestOrResponse response = sendConnectRequest(socketChannel);
            if (isRedirectResponse(response)) {
                redirectToLeader(response);
                break;
            } else if (isLookingForLeader(response)) {
                logger.info("Server is looking for leader. Trying next server");
                continue;
            } else { //we know the leader
                logger.info("Found leader. Establishing a new connection.");
                newPipelinedConnection(server);
                break;
            }
        } catch (IOException e) {
            logger.info("Unable to connect to " + server);
            //try next server
        }
    }
}

private boolean isLookingForLeader(RequestOrResponse requestOrResponse) {
    return requestOrResponse.getRequestId() == RequestId.LookingForLeader.getId();
}

private void redirectToLeader(RequestOrResponse response) {
    RedirectToLeaderResponse redirectResponse = deserialize(response);
    newPipelinedConnection(redirectResponse.leaderAddress);

    logger.info("Connected to the new leader "
            + redirectResponse.leaderServerId
            + " " + redirectResponse.leaderAddress
            + ". Checking connection");
}


private boolean isRedirectResponse(RequestOrResponse requestOrResponse) {
    return requestOrResponse.getRequestId() == RequestId.RedirectToLeader.getId();
}

Just establishing TCP connection is not enough, we need to know if the server can handle our requests. So clients send a special connection request for the server to acknowledge if it can serve the requests or else redirect to the leader server.

private RequestOrResponse sendConnectRequest(SingleSocketChannel socketChannel) throws IOException {
    RequestOrResponse request
            = new RequestOrResponse(RequestId.ConnectRequest.getId(), "CONNECT", 0);
    try {
        return socketChannel.blockingSend(request);
    } catch (IOException e) {
        resetConnectionToLeader();
        throw e;
    }
}

If an existing leader fails, the same technique is used to identify the newly elected leader from the cluster.

Once connected, the client maintains a Single Socket Channel to the leader server

Handling duplicate requests

In cases of failure, clients may try to connect to the new leader, resending the requests. But if those requests were already handled by the failed leader prior to failure, it might result in duplicates. Therefore, it's important to have a mechanism on the servers to ignore duplicate requests. Idempotent Receiver pattern is used to implement duplicate detection.

Coordinating tasks across a set of servers can be done by using Lease. The same can be used to implement group membership and failure detection mechanism.

State Watch is used to get notifications of changes to the metadata or time bound leases.

Examples

Google is known to use [chubby] lock service for coordination and metadata management.

[kafka] uses [zookeeper] to manage metadata and take decisions like leader election for cluster master. The proposed architecture change in Kafka will replace zookeeper with its own [raft] based controller cluster.

[bookkeeper] uses Zookeeper to manage cluster metadata.

[kubernetes] uses [etcd] for coordination, manage cluster metadata and group membership information.

All the big data storage and processing systems like [hdfs], [spark], [flink] use [zookeeper] for high availability and cluster coordination.

Notes

1: Because the entire cluster depends on the Consistent Core, it is critical to be aware of the details of the consensus algorithm used. Consensus implementations can run into liveness issues in some tricky network partition situations. For example, a Raft cluster can be disrupted by a partitioned server, which can continuously trigger leader election, unless special care is taken. This recent incident at Cloudflare is a good example to learn from.

Significant Revisions