This pattern is part of Patterns of Distributed Systems
Fixed Partitions
Keep the number of partitions fixed to keep the mapping of data to the partition unchanged when size of a cluster changes.
23 August 2022
Problem
To split data across a set of cluster nodes, each data item needs to be mapped to them. There are two requirements for mapping the data to the cluster nodes.
- The distribution should be uniform
- It should possible to know which cluster node stores a particular data item, without making a request to all the nodes
Considering a key value store, which is a good proxy for many storage systems, both requirements can be fulfilled by using the hash of the key and using what’s called the modulo operation to map it to a cluster node. So if we consider a three node cluster,we can map keys Alice, Bob, Mary and Philip like this:
Keys | Hash | Node Index(Hash % 3) |
---|---|---|
Alice | 133299819613694460644197938031451912208 | 0 |
Bob | 63479738429015246738359000453022047291 | 1 |
Mary | 37724856304035789372490171084843241126 | 2 |
Philip | 83980963731216160506671196398339418866 | 2 |
However, this method creates a problem when the cluster size changes. If two more nodes are added to the cluster, we will have five nodes. The mapping will then look like this:
Keys | Hash | Node Index(Hash % 5) |
---|---|---|
Alice | 133299819613694460644197938031451912208 | 3 |
Bob | 63479738429015246738359000453022047291 | 1 |
Mary | 37724856304035789372490171084843241126 | 1 |
Philip | 83980963731216160506671196398339418866 | 1 |
The way almost all the keys are mapped changes. Even by adding only a few new cluster nodes, all the data needs to be moved. When the data size is large, this is undesirable.
Solution
Map data to logical partitions. Logical partitions are mapped to the cluster nodes. Even if cluster nodes are added or removed, the mapping of data to partitions doesn't change. The cluster is launched with a preconfigured number of partitions say, for the sake of this example, 1024. This number does not change when new nodes are added to the cluster. So the way data is mapped to partitions using the hash of the key remains the same.
It's important that partitions are evenly distributed across cluster nodes. When partitions are moved to new nodes, it should be relatively quick with only a smaller portion of the data movement. Once configured, the partition number won't change; this mean it should have enough room for future growth of data volumes.
So the number of partitions selected should be significantly higher than the number of cluster nodes. For example, Akka suggests you should have number of shards ten times the number of nodes. partitioning in Apache Ignite has it's default value as 1024. Hazelcast has a default value of 271 for cluster size smaller than 100.
Data storage or retrieval is then a two step process.
- First, you find the partition for the given data item
- Then you find the cluster node where the partition is stored
To balance data across the cluster nodes when new ones are added, some of the partitions can be moved to the new nodes.
Choosing the hash function
It's critical to choose the hashing method which gives the same hash values independent of the platform and runtime. For example, programming languages like Java provide a hash for every object. However, it's important to note that hash value is dependent on the JVM runtime. So two different JVMs could give a different hash for the same key. To tackle this, hashing algorithms like MD5 hash or Murmur hash are used.
class HashingUtil…
public static BigInteger hash(String key) { try { MessageDigest messageDigest = MessageDigest.getInstance("MD5"); return new BigInteger(messageDigest.digest(key.getBytes())); } catch (Exception e) { throw new RuntimeException(e); } }
The keys are not mapped to nodes, but to partitions. Considering there are 9 partitions, the table looks like following. With the addition of new nodes to the cluster, the mapping of a key to partition does not change.
Keys | Hash | Partition (Hash % 9) | Node |
---|---|---|---|
Alice | 133299819613694460644197938031451912208 | 0 | 0 |
Bob | 63479738429015246738359000453022047291 | 1 | 1 |
Mary | 37724856304035789372490171084843241126 | 5 | 1 |
Philip | 83980963731216160506671196398339418866 | 2 | 2 |
Mapping partitions to cluster nodes
Partitions need to be mapped to cluster nodes. The mapping also needs to be stored and made accessible to the clients. It's common to use a dedicated Consistent Core; this handles both. The dedicated Consistent Core acts as a coordinator which keeps track of all nodes in the cluster and maps partitions to nodes. It also stores the mapping in a fault tolerant way by using a Replicated Log. The master cluster in YugabyteDB or controller implementation in Kafka are both good examples of this.
Peer-to-peer systems like Akka or Hazelcast also need a particular cluster node to act as an coordinator. They use Emergent Leader as the coordinator.
Systems like [kubernetes] use a generic Consistent Core like [etcd]. They need to elect one of the cluster nodes to play the role of coordinator as discussed here.
Tracking Cluster Membership

Each cluster node will register itself with the consistent-core. It also periodically sends a HeartBeat to allow the Consistent Core detect node failures.
class KVStore…
public void start() {
socketListener.start();
requestHandler.start();
network.sendAndReceive(coordLeader, new RegisterClusterNodeRequest(generateMessageId(), listenAddress));
scheduler.scheduleAtFixedRate(()->{
network.send(coordLeader, new HeartbeatMessage(generateMessageId(), listenAddress));
}, 200, 200, TimeUnit.MILLISECONDS);
}
The coordinator handles the registration and then stores member information.
class ClusterCoordinator…
ReplicatedLog replicatedLog; Membership membership = new Membership(); TimeoutBasedFailureDetector failureDetector = new TimeoutBasedFailureDetector(Duration.ofMillis(TIMEOUT_MILLIS)); private void handleRegisterClusterNodeRequest(Message message) { logger.info("Registering node " + message.from); CompletableFuture completableFuture = registerClusterNode(message.from); completableFuture.whenComplete((response, error) -> { logger.info("Sending register response to node " + message.from); network.send(message.from, new RegisterClusterNodeResponse(message.messageId, listenAddress)); }); } public CompletableFuture registerClusterNode(InetAddressAndPort address) { return replicatedLog.propose(new RegisterClusterNodeCommand(address)); }
When a registration is committed in the Replicated Log, the membership will be updated.
class ClusterCoordinator…
private void applyRegisterClusterNodeEntry(RegisterClusterNodeCommand command) { updateMembership(command.memberAddress); }
class ClusterCoordinator…
private void updateMembership(InetAddressAndPort address) { membership = membership.addNewMember(address); failureDetector.heartBeatReceived(address); }
The coordinator maintains a list of all nodes that are part of the cluster:
class Membership…
public class Membership { List<Member> liveMembers = new ArrayList<>(); List<Member> failedMembers = new ArrayList<>(); public boolean isFailed(InetAddressAndPort address) { return failedMembers.stream().anyMatch(m -> m.address.equals(address)); }
class Member…
public class Member implements Comparable<Member> { InetAddressAndPort address; MemberStatus status;
The coordinator will detect cluster node failures using a mechanism similar to Lease. If a cluster node stops sending the heartbeat, the node will be marked as failed.
class ClusterCoordinator…
@Override public void onBecomingLeader() { scheduledTask = executor.scheduleWithFixedDelay(this::checkMembership, 1000, 1000, TimeUnit.MILLISECONDS); failureDetector.start(); } private void checkMembership() { List<Member> failedMembers = getFailedMembers(); if (!failedMembers.isEmpty()) { replicatedLog.propose(new MemberFailedCommand(failedMembers)); } } private List<Member> getFailedMembers() { List<Member> liveMembers = membership.getLiveMembers(); return liveMembers.stream() .filter(m -> failureDetector.isMonitoring(m.getAddress()) && !failureDetector.isAlive(m.getAddress())) .collect(Collectors.toList()); }
An example scenario
Consider that there are three data servers athens, byzantium and cyrene.
Considering there are 9 partitions, the flow looks like following.
The client can then use the partition table to map a given key
to a particular cluster node.
Now a new cluster node - 'ephesus' - is added to the cluster.
The admin triggers a reassignment and the coordinator
checks which nodes are underloaded by checking the partition table.
It figures out that ephesus is the node which is underloaded,
and decides to allocate partition 7 to it, moving it from athens.
The coordinator stores the migrations and then sends the
request to athens to move partition 7 to ephesus.
Once the migration is complete, athens lets the coordinator know.
The coordinator then updates the partition table.
Assigning Partitions To Cluster Nodes
The coordinator assigns partitions to cluster nodes which are known at that point in time. If it's triggered every time a new cluster node is added, it might map partitions too early until the cluster reaches a stable state. This is why the coordinator should be configured to wait until the cluster reaches a minimum size.
The first time the partition assignment is done, it can simply be done in a round robin fashion.
class ClusterCoordinator…
CompletableFuture assignPartitionsToClusterNodes() { if (!minimumClusterSizeReached()) { return CompletableFuture.failedFuture(new NotEnoughClusterNodesException(MINIMUM_CLUSTER_SIZE)); } return initializePartitionAssignment(); } private boolean minimumClusterSizeReached() { return membership.getLiveMembers().size() >= MINIMUM_CLUSTER_SIZE; }
private CompletableFuture initializePartitionAssignment() {
partitionAssignmentStatus = PartitionAssignmentStatus.IN_PROGRESS;
PartitionTable partitionTable = arrangePartitions();
return replicatedLog.propose(new PartitiontableCommand(partitionTable));
}
public PartitionTable arrangePartitions() {
PartitionTable partitionTable = new PartitionTable();
List<Member> liveMembers = membership.getLiveMembers();
for (int partitionId = 1; partitionId <= noOfPartitions; partitionId++) {
int index = partitionId % liveMembers.size();
Member member = liveMembers.get(index);
partitionTable.addPartition(partitionId, new PartitionInfo(partitionId, member.getAddress(), PartitionStatus.ASSIGNED));
}
return partitionTable;
}
The replication log makes the partition table persistent.
class ClusterCoordinator…
PartitionTable partitionTable; PartitionAssignmentStatus partitionAssignmentStatus = PartitionAssignmentStatus.UNASSIGNED; private void applyPartitionTableCommand(PartitiontableCommand command) { this.partitionTable = command.partitionTable; partitionAssignmentStatus = PartitionAssignmentStatus.ASSIGNED; if (isLeader()) { sendMessagesToMembers(partitionTable); } }
Once the partition assignment is persisted, the coordinator sends messages to all cluster nodes to tell each node which partitions it now owns.
class ClusterCoordinator…
List<Integer> pendingPartitionAssignments = new ArrayList<>(); private void sendMessagesToMembers(PartitionTable partitionTable) { Map<Integer, PartitionInfo> partitionsTobeHosted = partitionTable.getPartitionsTobeHosted(); partitionsTobeHosted.forEach((partitionId, partitionInfo) -> { pendingPartitionAssignments.add(partitionId); HostPartitionMessage message = new HostPartitionMessage(requestNumber++, this.listenAddress, partitionId); logger.info("Sending host partition message to " + partitionInfo.hostedOn + " partitionId=" + partitionId); scheduler.execute(new RetryableTask(partitionInfo.hostedOn, network, this, partitionId, message)); }); }
The controller will keep trying to reach nodes continuously until its message is successful.
class RetryableTask…
static class RetryableTask implements Runnable { Logger logger = LogManager.getLogger(RetryableTask.class); InetAddressAndPort address; Network network; ClusterCoordinator coordinator; Integer partitionId; int attempt; private Message message; public RetryableTask(InetAddressAndPort address, Network network, ClusterCoordinator coordinator, Integer partitionId, Message message) { this.address = address; this.network = network; this.coordinator = coordinator; this.partitionId = partitionId; this.message = message; } @Override public void run() { attempt++; try { //stop trying if the node is failed. if (coordinator.isSuspected(address)) { return; } logger.info("Sending " + message + " to=" + address); network.send(address, message); } catch (Exception e) { logger.error("Error trying to send "); scheduleWithBackOff(); } } private void scheduleWithBackOff() { scheduler.schedule(this, getBackOffDelay(attempt), TimeUnit.MILLISECONDS); } private long getBackOffDelay(int attempt) { long baseDelay = (long) Math.pow(2, attempt); long jitter = randomJitter(); return baseDelay + jitter; } private long randomJitter() { int i = new Random(1).nextInt(); i = i < 0 ? i * -1 : i; long jitter = i % 50; return jitter; } }
When cluster node receives the request to create the partition, it creates one with the given partition id. If we imagine this happening within a simple key-value store, its implementation will look something like this:
class KVStore…
Map<Integer, Partition> allPartitions = new ConcurrentHashMap<>(); private void handleHostPartitionMessage(Message message) { Integer partitionId = ((HostPartitionMessage) message).getPartitionId(); addPartitions(partitionId); logger.info("Adding partition " + partitionId + " to " + listenAddress); network.send(message.from, new HostPartitionAcks(message.messageId, this.listenAddress, partitionId)); } public void addPartitions(Integer partitionId) { allPartitions.put(partitionId, new Partition(partitionId)); }
class Partition…
SortedMap<String, String> kv = new TreeMap<>(); private Integer partitionId;
Once the coordinator receives the message that the partition has been successfully created, it persists it in the replicated log and updates the partition status to be online.
class ClusterCoordinator…
private void handleHostPartitionAck(Message message) { int partitionId = ((HostPartitionAcks) message).getPartitionId(); pendingPartitionAssignments.remove(Integer.valueOf(partitionId)); logger.info("Received host partition ack from " + message.from + " partitionId=" + partitionId + " pending=" + pendingPartitionAssignments); CompletableFuture future = replicatedLog.propose(new UpdatePartitionStatusCommand(partitionId, PartitionStatus.ONLINE)); future.join(); }
Once the High-Water Mark is reached, and the record is applied, the partition’s status will be updated.
class ClusterCoordinator…
private void updateParitionStatus(UpdatePartitionStatusCommand command) { removePendingRequest(command.partitionId); logger.info("Changing status for " + command.partitionId + " to " + command.status); logger.info(partitionTable.toString()); partitionTable.updateStatus(command.partitionId, command.status); }
Client Interface
If we again consider the example of a simple key and value store, if a client needs to store or get a value for a particular key, it can do so by following these steps:
- The client applies the hash function to the key and finds the relevant partition based on the total number of partitions.
- The client gets the partition table from the coordinator and finds the cluster node that is hosting the partition. The client also periodically refreshes the partition table.
Clients fetching a partition table from the coordinator can quickly lead to bottlenecks, especially if all requests are being served by a single coordinator leader. That is why it is common practice to keep metadata available on all cluster nodes. The coordinator can either push metadata to cluster nodes, or cluster nodes can pull it from the coordinator. Clients can then connect with any cluster node to refresh the metadata.
This is generally implemented inside the client library provided by the key value store, or by client request handling (which happens on the cluster nodes.)
class Client…
public void put(String key, String value) throws IOException { Integer partitionId = findPartition(key, noOfPartitions); InetAddressAndPort nodeAddress = getNodeAddressFor(partitionId); sendPutMessage(partitionId, nodeAddress, key, value); } private InetAddressAndPort getNodeAddressFor(Integer partitionId) { PartitionInfo partitionInfo = partitionTable.getPartition(partitionId); InetAddressAndPort nodeAddress = partitionInfo.getAddress(); return nodeAddress; } private void sendPutMessage(Integer partitionId, InetAddressAndPort address, String key, String value) throws IOException { PartitionPutMessage partitionPutMessage = new PartitionPutMessage(partitionId, key, value); SocketClient socketClient = new SocketClient(address); socketClient.blockingSend(new RequestOrResponse(RequestId.PartitionPutKV.getId(), JsonSerDes.serialize(partitionPutMessage))); }
public String get(String key) throws IOException { Integer partitionId = findPartition(key, noOfPartitions); InetAddressAndPort nodeAddress = getNodeAddressFor(partitionId); return sendGetMessage(partitionId, key, nodeAddress); } private String sendGetMessage(Integer partitionId, String key, InetAddressAndPort address) throws IOException { PartitionGetMessage partitionGetMessage = new PartitionGetMessage(partitionId, key); SocketClient socketClient = new SocketClient(address); RequestOrResponse response = socketClient.blockingSend(new RequestOrResponse(RequestId.PartitionGetKV.getId(), JsonSerDes.serialize(partitionGetMessage))); PartitionGetResponseMessage partitionGetResponseMessage = JsonSerDes.deserialize(response.getMessageBodyJson(), PartitionGetResponseMessage.class); return partitionGetResponseMessage.getValue(); }
Moving partitions to newly added members
When new nodes are added to a cluster, some partitions can be moved to other nodes. This can be done automatically once a new cluster node is added. But it can involve a lot of data being moved across the cluster node, which is why an administrator will typically trigger the repartitioning. One simple method to do this is to calculate the average number of partitions each node should host and then move the additional partitions to the new node. For example, if the number of partitions is 30 and there are three existing nodes in the cluster, each node should host 10 partitions. If a new node is added, the average per node is about 7. The coordinator will therefore try to move three partitions from each cluster node to the new one.
class ClusterCoordinator…
List<Migration> pendingMigrations = new ArrayList<>(); boolean reassignPartitions() { if (partitionAssignmentInProgress()) { logger.info("Partition assignment in progress"); return false; } List<Migration> migrations = repartition(this.partitionTable); CompletableFuture proposalFuture = replicatedLog.propose(new MigratePartitionsCommand(migrations)); proposalFuture.join(); return true; }
public List<Migration> repartition(PartitionTable partitionTable) { int averagePartitionsPerNode = getAveragePartitionsPerNode(); List<Member> liveMembers = membership.getLiveMembers(); var overloadedNodes = partitionTable.getOverloadedNodes(averagePartitionsPerNode, liveMembers); var underloadedNodes = partitionTable.getUnderloadedNodes(averagePartitionsPerNode, liveMembers); var migrations = tryMovingPartitionsToUnderLoadedMembers(averagePartitionsPerNode, overloadedNodes, underloadedNodes); return migrations; } private List<Migration> tryMovingPartitionsToUnderLoadedMembers(int averagePartitionsPerNode, Map<InetAddressAndPort, PartitionList> overloadedNodes, Map<InetAddressAndPort, PartitionList> underloadedNodes) { List<Migration> migrations = new ArrayList<>(); for (InetAddressAndPort member : overloadedNodes.keySet()) { var partitions = overloadedNodes.get(member); var toMove = partitions.subList(averagePartitionsPerNode, partitions.getSize()); overloadedNodes.put(member, partitions.subList(0, averagePartitionsPerNode)); ArrayDeque<Integer> moveQ = new ArrayDeque<Integer>(toMove.partitionList()); while (!moveQ.isEmpty() && nodeWithLeastPartitions(underloadedNodes, averagePartitionsPerNode).isPresent()) { assignToNodesWithLeastPartitions(migrations, member, moveQ, underloadedNodes, averagePartitionsPerNode); } if (!moveQ.isEmpty()) { overloadedNodes.get(member).addAll(moveQ); } } return migrations; } int getAveragePartitionsPerNode() { return noOfPartitions / membership.getLiveMembers().size(); }
The coordinator will persist the computed migrations in the replicated log and then send requests to move partitions across the cluster nodes.
private void applyMigratePartitionCommand(MigratePartitionsCommand command) { logger.info("Handling partition migrations " + command.migrations); for (Migration migration : command.migrations) { RequestPartitionMigrationMessage message = new RequestPartitionMigrationMessage(requestNumber++, this.listenAddress, migration); pendingMigrations.add(migration); if (isLeader()) { scheduler.execute(new RetryableTask(migration.fromMember, network, this, migration.getPartitionId(), message)); } } }
When a cluster node receives a request to migrate, it will mark the partition as migrating. This stops any further modifications to the partition. It will then send the entire partition data to the target node.
class KVStore…
private void handleRequestPartitionMigrationMessage(RequestPartitionMigrationMessage message) { Migration migration = message.getMigration(); Integer partitionId = migration.getPartitionId(); InetAddressAndPort toServer = migration.getToMember(); if (!allPartitions.containsKey(partitionId)) { return;// The partition is not available with this node. } Partition partition = allPartitions.get(partitionId); partition.setMigrating(); network.send(toServer, new MovePartitionMessage(requestNumber++, this.listenAddress, toServer, partition)); }
The cluster node that receives the request will add the new partition to itself and return an acknowledgement.
class KVStore…
private void handleMovePartition(Message message) { MovePartitionMessage movePartitionMessage = (MovePartitionMessage) message; Partition partition = movePartitionMessage.getPartition(); allPartitions.put(partition.getId(), partition); network.send(message.from, new PartitionMovementComplete(message.messageId, listenAddress, new Migration(movePartitionMessage.getMigrateFrom(), movePartitionMessage.getMigrateTo(), partition.getId()))); }
The cluster node previously owned the partition will then send the migration complete message to the cluster coordinator.
class KVStore…
private void handlePartitionMovementCompleteMessage(PartitionMovementComplete message) { allPartitions.remove(message.getMigration().getPartitionId()); network.send(coordLeader, new MigrationCompleteMessage(requestNumber++, listenAddress, message.getMigration())); }
The cluster coordinator will then mark the migration as complete. The change will be stored in the replicated log.
class ClusterCoordinator…
private void handleMigrationCompleteMessage(MigrationCompleteMessage message) { MigrationCompleteMessage migrationCompleteMessage = message; CompletableFuture propose = replicatedLog.propose(new MigrationCompletedCommand(message.getMigration())); propose.join(); }
class ClusterCoordinator…
private void applyMigrationCompleted(MigrationCompletedCommand command) { pendingMigrations.remove(command.getMigration()); logger.info("Completed migration " + command.getMigration()); logger.info("pendingMigrations = " + pendingMigrations); partitionTable.migrationCompleted(command.getMigration()); }
class PartitionTable…
public void migrationCompleted(Migration migration) { this.addPartition(migration.partitionId, new PartitionInfo(migration.partitionId, migration.toMember, ClusterCoordinator.PartitionStatus.ONLINE)); }
Examples
In Kafka each topic is created with a fixed number of partitions
Shard allocation in Akka has a fixed number of shards configured. The guideline is to have the number of shards to be 10 times the size of the cluster
In-memory data grid products like partitioning in Apache Ignite and partitioning in Hazelcast have a fixed number of partitions that are configured for their caches.
This page is part of:
Patterns of Distributed Systems

Patterns
- Clock-Bound Wait
- Consistent Core
- Emergent Leader
- Fixed Partitions
- Follower Reads
- Generation Clock
- Gossip Dissemination
- HeartBeat
- High-Water Mark
- Hybrid Clock
- Idempotent Receiver
- Key-Range Partitions
- Lamport Clock
- Leader and Followers
- Lease
- Low-Water Mark
- Paxos
- Quorum
- Replicated Log
- Request Batch
- Request Pipeline
- Request Waiting List
- Segmented Log
- Single Socket Channel
- Singular Update Queue
- State Watch
- Two Phase Commit
- Version Vector
- Versioned Value
- Write-Ahead Log
Significant Revisions
23 August 2022: