This pattern is part of Patterns of Distributed Systems

Fixed Partitions

Keep the number of partitions fixed to keep the mapping of data to the partition unchanged when size of a cluster changes.

23 August 2022

Problem

To split data across a set of cluster nodes, each data item needs to be mapped to them. There are two requirements for mapping the data to the cluster nodes.

• The distribution should be uniform
• It should possible to know which cluster node stores a particular data item, without making a request to all the nodes

Considering a key value store, which is a good proxy for many storage systems, both requirements can be fulfilled by using the hash of the key and using what’s called the modulo operation to map it to a cluster node. So if we consider a three node cluster,we can map keys Alice, Bob, Mary and Philip like this:

KeysHashNode Index(Hash % 3)
Alice1332998196136944606441979380314519122080
Bob634797384290152467383590004530220472911
Mary377248563040357893724901710848432411262
Philip839809637312161605066711963983394188662

However, this method creates a problem when the cluster size changes. If two more nodes are added to the cluster, we will have five nodes. The mapping will then look like this:

KeysHashNode Index(Hash % 5)
Alice1332998196136944606441979380314519122083
Bob634797384290152467383590004530220472911
Mary377248563040357893724901710848432411261
Philip839809637312161605066711963983394188661

The way almost all the keys are mapped changes. Even by adding only a few new cluster nodes, all the data needs to be moved. When the data size is large, this is undesirable.

Solution

Map data to logical partitions. Logical partitions are mapped to the cluster nodes. Even if cluster nodes are added or removed, the mapping of data to partitions doesn't change. The cluster is launched with a preconfigured number of partitions say, for the sake of this example, 1024. This number does not change when new nodes are added to the cluster. So the way data is mapped to partitions using the hash of the key remains the same.

It's important that partitions are evenly distributed across cluster nodes. When partitions are moved to new nodes, it should be relatively quick with only a smaller portion of the data movement. Once configured, the partition number won't change; this mean it should have enough room for future growth of data volumes.

So the number of partitions selected should be significantly higher than the number of cluster nodes. For example, Akka suggests you should have number of shards ten times the number of nodes. partitioning in Apache Ignite has it's default value as 1024. Hazelcast has a default value of 271 for cluster size smaller than 100.

Data storage or retrieval is then a two step process.

• First, you find the partition for the given data item
• Then you find the cluster node where the partition is stored

To balance data across the cluster nodes when new ones are added, some of the partitions can be moved to the new nodes.

Choosing the hash function

It's critical to choose the hashing method which gives the same hash values independent of the platform and runtime. For example, programming languages like Java provide a hash for every object. However, it's important to note that hash value is dependent on the JVM runtime. So two different JVMs could give a different hash for the same key. To tackle this, hashing algorithms like MD5 hash or Murmur hash are used.

class HashingUtil…

```  public static BigInteger hash(String key)
{
try
{
MessageDigest messageDigest = MessageDigest.getInstance("MD5");
return new BigInteger(messageDigest.digest(key.getBytes()));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}```

The keys are not mapped to nodes, but to partitions. Considering there are 9 partitions, the table looks like following. With the addition of new nodes to the cluster, the mapping of a key to partition does not change.

KeysHashPartition (Hash % 9)Node
Alice13329981961369446064419793803145191220800
Bob6347973842901524673835900045302204729111
Mary3772485630403578937249017108484324112651
Philip8398096373121616050667119639833941886622

Mapping partitions to cluster nodes

Partitions need to be mapped to cluster nodes. The mapping also needs to be stored and made accessible to the clients. It's common to use a dedicated Consistent Core; this handles both. The dedicated Consistent Core acts as a coordinator which keeps track of all nodes in the cluster and maps partitions to nodes. It also stores the mapping in a fault tolerant way by using a Replicated Log. The master cluster in YugabyteDB or controller implementation in Kafka are both good examples of this.

Peer-to-peer systems like Akka or Hazelcast also need a particular cluster node to act as an coordinator. They use Emergent Leader as the coordinator.

Systems like [kubernetes] use a generic Consistent Core like [etcd]. They need to elect one of the cluster nodes to play the role of coordinator as discussed here.

Tracking Cluster Membership

Each cluster node will register itself with the consistent-core. It also periodically sends a HeartBeat to allow the Consistent Core detect node failures.

class KVStore…

```  public void start() {
socketListener.start();
requestHandler.start();
scheduler.scheduleAtFixedRate(()->{
}, 200, 200, TimeUnit.MILLISECONDS);

}```

The coordinator handles the registration and then stores member information.

class ClusterCoordinator…

```  ReplicatedLog replicatedLog;
Membership membership = new Membership();
TimeoutBasedFailureDetector failureDetector = new TimeoutBasedFailureDetector(Duration.ofMillis(TIMEOUT_MILLIS));

private void handleRegisterClusterNodeRequest(Message message) {
logger.info("Registering node " + message.from);
CompletableFuture completableFuture = registerClusterNode(message.from);
completableFuture.whenComplete((response, error) -> {
logger.info("Sending register response to node " + message.from);
});
}

}```

When a registration is committed in the Replicated Log, the membership will be updated.

class ClusterCoordinator…

```  private void applyRegisterClusterNodeEntry(RegisterClusterNodeCommand command) {
}```

class ClusterCoordinator…

```  private void updateMembership(InetAddressAndPort address) {
}```

The coordinator maintains a list of all nodes that are part of the cluster:

class Membership…

```  public class Membership {
List<Member> liveMembers = new ArrayList<>();
List<Member> failedMembers = new ArrayList<>();

}```

class Member…

```  public class Member implements Comparable<Member> {
MemberStatus status;```

The coordinator will detect cluster node failures using a mechanism similar to Lease. If a cluster node stops sending the heartbeat, the node will be marked as failed.

class ClusterCoordinator…

```  @Override
1000,
1000,
TimeUnit.MILLISECONDS);
failureDetector.start();
}

private void checkMembership() {
List<Member> failedMembers = getFailedMembers();
if (!failedMembers.isEmpty()) {
replicatedLog.propose(new MemberFailedCommand(failedMembers));
}
}

private List<Member> getFailedMembers() {
List<Member> liveMembers = membership.getLiveMembers();
return liveMembers.stream()
.collect(Collectors.toList());

}```
An example scenario

Consider that there are three data servers athens, byzantium and cyrene. Considering there are 9 partitions, the flow looks like following.

The client can then use the partition table to map a given key to a particular cluster node.

Now a new cluster node - 'ephesus' - is added to the cluster. The admin triggers a reassignment and the coordinator checks which nodes are underloaded by checking the partition table. It figures out that ephesus is the node which is underloaded, and decides to allocate partition 7 to it, moving it from athens. The coordinator stores the migrations and then sends the request to athens to move partition 7 to ephesus. Once the migration is complete, athens lets the coordinator know. The coordinator then updates the partition table.

Assigning Partitions To Cluster Nodes

The coordinator assigns partitions to cluster nodes which are known at that point in time. If it's triggered every time a new cluster node is added, it might map partitions too early until the cluster reaches a stable state. This is why the coordinator should be configured to wait until the cluster reaches a minimum size.

The first time the partition assignment is done, it can simply be done in a round robin fashion.

class ClusterCoordinator…

```  CompletableFuture assignPartitionsToClusterNodes() {
if (!minimumClusterSizeReached()) {
return CompletableFuture.failedFuture(new NotEnoughClusterNodesException(MINIMUM_CLUSTER_SIZE));
}
return initializePartitionAssignment();
}

private boolean minimumClusterSizeReached() {
return membership.getLiveMembers().size() >= MINIMUM_CLUSTER_SIZE;
}```
```  private CompletableFuture initializePartitionAssignment() {
partitionAssignmentStatus = PartitionAssignmentStatus.IN_PROGRESS;
PartitionTable partitionTable = arrangePartitions();
return replicatedLog.propose(new PartitiontableCommand(partitionTable));
}

public PartitionTable arrangePartitions() {
PartitionTable partitionTable = new PartitionTable();
List<Member> liveMembers = membership.getLiveMembers();
for (int partitionId = 1; partitionId <= noOfPartitions; partitionId++) {
int index = partitionId % liveMembers.size();
Member member = liveMembers.get(index);
}
return partitionTable;
}```

The replication log makes the partition table persistent.

class ClusterCoordinator…

```  PartitionTable partitionTable;
PartitionAssignmentStatus partitionAssignmentStatus = PartitionAssignmentStatus.UNASSIGNED;

private void applyPartitionTableCommand(PartitiontableCommand command) {
this.partitionTable = command.partitionTable;
partitionAssignmentStatus = PartitionAssignmentStatus.ASSIGNED;
sendMessagesToMembers(partitionTable);
}
}```

Once the partition assignment is persisted, the coordinator sends messages to all cluster nodes to tell each node which partitions it now owns.

class ClusterCoordinator…

```  List<Integer> pendingPartitionAssignments = new ArrayList<>();

private void sendMessagesToMembers(PartitionTable partitionTable) {
Map<Integer, PartitionInfo> partitionsTobeHosted = partitionTable.getPartitionsTobeHosted();
partitionsTobeHosted.forEach((partitionId, partitionInfo) -> {
HostPartitionMessage message = new HostPartitionMessage(requestNumber++, this.listenAddress, partitionId);
logger.info("Sending host partition message to " + partitionInfo.hostedOn + " partitionId=" + partitionId);
scheduler.execute(new RetryableTask(partitionInfo.hostedOn, network, this, partitionId, message));
});
}```

The controller will keep trying to reach nodes continuously until its message is successful.

```  static class RetryableTask implements Runnable {
Network network;
ClusterCoordinator coordinator;
Integer partitionId;
int attempt;
private Message message;

this.network = network;
this.coordinator = coordinator;
this.partitionId = partitionId;
this.message = message;
}

@Override
public void run() {
attempt++;
try {
//stop trying if the node is failed.
return;
}
logger.info("Sending " + message + " to=" + address);
} catch (Exception e) {
logger.error("Error trying to send ");
scheduleWithBackOff();
}
}

private void scheduleWithBackOff() {
scheduler.schedule(this, getBackOffDelay(attempt), TimeUnit.MILLISECONDS);
}

private long getBackOffDelay(int attempt) {
long baseDelay = (long) Math.pow(2, attempt);
long jitter = randomJitter();
return baseDelay + jitter;
}

private long randomJitter() {
int i = new Random(1).nextInt();
i = i < 0 ? i * -1 : i;
long jitter = i % 50;
return jitter;
}

}```

When cluster node receives the request to create the partition, it creates one with the given partition id. If we imagine this happening within a simple key-value store, its implementation will look something like this:

class KVStore…

```  Map<Integer, Partition> allPartitions = new ConcurrentHashMap<>();
private void handleHostPartitionMessage(Message message) {
Integer partitionId = ((HostPartitionMessage) message).getPartitionId();
}

allPartitions.put(partitionId, new Partition(partitionId));

}```

class Partition…

```  SortedMap<String, String> kv = new TreeMap<>();
private Integer partitionId;```

Once the coordinator receives the message that the partition has been successfully created, it persists it in the replicated log and updates the partition status to be online.

class ClusterCoordinator…

```  private void handleHostPartitionAck(Message message) {
int partitionId = ((HostPartitionAcks) message).getPartitionId();
pendingPartitionAssignments.remove(Integer.valueOf(partitionId));
logger.info("Received host partition ack from " + message.from + " partitionId=" + partitionId + " pending=" + pendingPartitionAssignments);
CompletableFuture future = replicatedLog.propose(new UpdatePartitionStatusCommand(partitionId, PartitionStatus.ONLINE));
future.join();
}```

Once the High-Water Mark is reached, and the record is applied, the partition’s status will be updated.

class ClusterCoordinator…

```  private void updateParitionStatus(UpdatePartitionStatusCommand command) {
removePendingRequest(command.partitionId);
logger.info("Changing status for " + command.partitionId + " to " + command.status);
logger.info(partitionTable.toString());
}```
Client Interface

If we again consider the example of a simple key and value store, if a client needs to store or get a value for a particular key, it can do so by following these steps:

• The client applies the hash function to the key and finds the relevant partition based on the total number of partitions.
• The client gets the partition table from the coordinator and finds the cluster node that is hosting the partition. The client also periodically refreshes the partition table.

Clients fetching a partition table from the coordinator can quickly lead to bottlenecks, especially if all requests are being served by a single coordinator leader. That is why it is common practice to keep metadata available on all cluster nodes. The coordinator can either push metadata to cluster nodes, or cluster nodes can pull it from the coordinator. Clients can then connect with any cluster node to refresh the metadata.

This is generally implemented inside the client library provided by the key value store, or by client request handling (which happens on the cluster nodes.)

class Client…

```  public void put(String key, String value) throws IOException {
Integer partitionId = findPartition(key, noOfPartitions);
}

PartitionInfo partitionInfo = partitionTable.getPartition(partitionId);
}

private void sendPutMessage(Integer partitionId, InetAddressAndPort address, String key, String value) throws IOException {
PartitionPutMessage partitionPutMessage = new PartitionPutMessage(partitionId, key, value);
socketClient.blockingSend(new RequestOrResponse(RequestId.PartitionPutKV.getId(),
JsonSerDes.serialize(partitionPutMessage)));
}```
```  public String get(String key) throws IOException {
Integer partitionId = findPartition(key, noOfPartitions);
}

PartitionGetMessage partitionGetMessage = new PartitionGetMessage(partitionId, key);
RequestOrResponse response = socketClient.blockingSend(new RequestOrResponse(RequestId.PartitionGetKV.getId(), JsonSerDes.serialize(partitionGetMessage)));
PartitionGetResponseMessage partitionGetResponseMessage = JsonSerDes.deserialize(response.getMessageBodyJson(), PartitionGetResponseMessage.class);
return partitionGetResponseMessage.getValue();
}```
Moving partitions to newly added members

When new nodes are added to a cluster, some partitions can be moved to other nodes. This can be done automatically once a new cluster node is added. But it can involve a lot of data being moved across the cluster node, which is why an administrator will typically trigger the repartitioning. One simple method to do this is to calculate the average number of partitions each node should host and then move the additional partitions to the new node. For example, if the number of partitions is 30 and there are three existing nodes in the cluster, each node should host 10 partitions. If a new node is added, the average per node is about 7. The coordinator will therefore try to move three partitions from each cluster node to the new one.

class ClusterCoordinator…

```  List<Migration> pendingMigrations = new ArrayList<>();

boolean reassignPartitions() {
if (partitionAssignmentInProgress()) {
logger.info("Partition assignment in progress");
return false;
}
List<Migration> migrations = repartition(this.partitionTable);
CompletableFuture proposalFuture = replicatedLog.propose(new MigratePartitionsCommand(migrations));
proposalFuture.join();
return true;
}```
```public List<Migration> repartition(PartitionTable partitionTable) {
int averagePartitionsPerNode = getAveragePartitionsPerNode();
List<Member> liveMembers = membership.getLiveMembers();

return migrations;
}

List<Migration> migrations = new ArrayList<>();
var toMove = partitions.subList(averagePartitionsPerNode, partitions.getSize());
ArrayDeque<Integer> moveQ = new ArrayDeque<Integer>(toMove.partitionList());
while (!moveQ.isEmpty() && nodeWithLeastPartitions(underloadedNodes, averagePartitionsPerNode).isPresent()) {
}
if (!moveQ.isEmpty()) {
}
}
return migrations;
}

int getAveragePartitionsPerNode() {
return noOfPartitions / membership.getLiveMembers().size();
}```

The coordinator will persist the computed migrations in the replicated log and then send requests to move partitions across the cluster nodes.

```private void applyMigratePartitionCommand(MigratePartitionsCommand command) {
logger.info("Handling partition migrations " + command.migrations);
for (Migration migration : command.migrations) {
RequestPartitionMigrationMessage message = new RequestPartitionMigrationMessage(requestNumber++, this.listenAddress, migration);
scheduler.execute(new RetryableTask(migration.fromMember, network, this, migration.getPartitionId(), message));
}
}
}```

When a cluster node receives a request to migrate, it will mark the partition as migrating. This stops any further modifications to the partition. It will then send the entire partition data to the target node.

class KVStore…

```  private void handleRequestPartitionMigrationMessage(RequestPartitionMigrationMessage message) {
Migration migration = message.getMigration();
Integer partitionId = migration.getPartitionId();
if (!allPartitions.containsKey(partitionId)) {
return;// The partition is not available with this node.
}
Partition partition = allPartitions.get(partitionId);
partition.setMigrating();
network.send(toServer, new MovePartitionMessage(requestNumber++, this.listenAddress, toServer, partition));
}```

The cluster node that receives the request will add the new partition to itself and return an acknowledgement.

class KVStore…

```  private void handleMovePartition(Message message) {
MovePartitionMessage movePartitionMessage = (MovePartitionMessage) message;
Partition partition = movePartitionMessage.getPartition();
allPartitions.put(partition.getId(), partition);
new Migration(movePartitionMessage.getMigrateFrom(), movePartitionMessage.getMigrateTo(),  partition.getId())));
}```

The cluster node previously owned the partition will then send the migration complete message to the cluster coordinator.

class KVStore…

```  private void handlePartitionMovementCompleteMessage(PartitionMovementComplete message) {
allPartitions.remove(message.getMigration().getPartitionId());
message.getMigration()));
}```

The cluster coordinator will then mark the migration as complete. The change will be stored in the replicated log.

class ClusterCoordinator…

```  private void handleMigrationCompleteMessage(MigrationCompleteMessage message) {
MigrationCompleteMessage migrationCompleteMessage = message;
CompletableFuture propose = replicatedLog.propose(new MigrationCompletedCommand(message.getMigration()));
propose.join();
}```

class ClusterCoordinator…

```  private void applyMigrationCompleted(MigrationCompletedCommand command) {
pendingMigrations.remove(command.getMigration());
logger.info("Completed migration " + command.getMigration());
logger.info("pendingMigrations = " + pendingMigrations);
partitionTable.migrationCompleted(command.getMigration());
}```

class PartitionTable…

```  public void migrationCompleted(Migration migration) {