This pattern is part of Patterns of Distributed Systems

Key-Range Partitions

Partition data in sorted key ranges to efficiently handle range queries.

25 August 2022


To split data across a set of cluster nodes, each data item needs to be mapped to one. If users want to query a range of keys, specifying only the start and end key, all partitions will need to be queried in order for the values to be acquired. Querying every partition for a single request is far from optimal.

If we take an key value store example, we can store the author names using hash based mapping. (as used in Fixed Partitions).

KeysHashPartition (Hash % No.Of Partitions(9)Node

If a user wants to get values for a range of names, - beginning with, say, letter 'a' to 'f' - there's no way to know which partitions we should fetch data from if the hash of the key is being used to map keys to partitions. All partitions need to be queried to get the values required.


Create logical partitions for keys ranges in a sorted order. The partitions can then be mapped to cluster nodes. To query a range of data, the client can get all partitions that contain keys from a given range and query only those specific partitions to get the values required.

Predefining key ranges

If we already know the whole key space and distribution of keys, the ranges for partitions can be specified upfront.

Let's return to our simple key value store with string keys and values. In this example we are storing author names and their books. If we know the author name distribution upfront, we can then define partition splits at specific letters - let's say, in this instance, 'b' and 'd'.

The start and end of the entire key range needs to be specifically marked. We can use an empty string to mark the lowest and the highest key. The ranges will be created like this:

Key RangeDescription
("", "b"]Covers all the names starting from a to b, excluding b
("b", d]Covers all the names starting from b to d, excluding d
("d", ""]Covers everything else

The range will be represented by a start and an end key

class Range…

  private String startKey;
  private String endKey;

The cluster coordinator creates ranges from the specified split points. The partitions will then be assigned to cluster nodes.

class ClusterCoordinator…

  PartitionTable createPartitionTableFor(List<String> splits) {
      List<Range> ranges = createRangesFromSplitPoints(splits);
      return arrangePartitions(ranges, membership.getLiveMembers());

  List<Range> createRangesFromSplitPoints(List<String> splits) {
      List<Range> ranges = new ArrayList<>();
      String startKey = Range.MIN_KEY;
      for (String split : splits) {
          String endKey = split;
          ranges.add(new Range(startKey, endKey));
          startKey = split;
      ranges.add(new Range(startKey, Range.MAX_KEY));
      return ranges;

  PartitionTable arrangePartitions(List<Range> ranges, List<Member> liveMembers) {
      PartitionTable partitionTable = new PartitionTable();
      for (int i = 0; i < ranges.size(); i++) {
          //simple round-robin assignment.
          Member member = liveMembers.get(i % liveMembers.size());
          int partitionId = newPartitionId();
          Range range = ranges.get(i);
          PartitionInfo partitionInfo = new PartitionInfo(partitionId, member.getAddress(), PartitionStatus.ASSIGNED, range);
          partitionTable.addPartition(partitionId, partitionInfo);
      return partitionTable;

The consistent core, acting as a cluster coordinator, stores the mapping in a fault tolerant way by using a Replicated Log. The implementation is similar to the one explained in the pattern fixed partitions.

Client Interface

If a client needs to store or get a value for a particular key in a key-value store, it needs to follow these steps

class Client…

  public List<String> getValuesInRange(Range range) throws IOException {
      PartitionTable partitionTable = getPartitionTable();
      List<PartitionInfo> partitionsInRange = partitionTable.getPartitionsInRange(range);
      List<String> values = new ArrayList<>();
      for (PartitionInfo partitionInfo : partitionsInRange) {
          List<String> partitionValues = sendGetRangeMessage(partitionInfo.getPartitionId(), range, partitionInfo.getAddress());
      return values;

  private PartitionTable getPartitionTable() throws IOException {
      GetPartitionTableResponse response = sendGetPartitionTableRequest(coordinatorAddress);
      return response.getPartitionTable();

  private List<String> sendGetRangeMessage(int partitionId, Range range, InetAddressAndPort address) throws IOException {
      GetAllInRangeRequest partitionGetMessage = new GetAllInRangeRequest(partitionId, range);
      GetAllInRangeResponse response = sendGetRangeRequest(address, partitionGetMessage);
      return response.getValues();

class PartitionTable…

  public List<PartitionInfo> getPartitionsInRange(Range range) {
      List<PartitionInfo> allPartitions = getAllPartitions();
      List<PartitionInfo> partitionsInRange = -> p.getRange().isOverlapping(range)).collect(Collectors.toList());
      return partitionsInRange;

class Range…

  public boolean isOverlapping(Range range) {
      return this.contains(range.startKey) || range.contains(this.startKey) || contains(range.endKey);

  public boolean contains(String key) {
      return key.compareTo(startKey) >= 0 &&
              (endKey.equals(Range.MAX_KEY) || endKey.compareTo(key) > 0);


class Partition…

  public List<String> getAllInRange(Range range) {
      return kv.subMap(range.getStartKey(), range.getEndKey()).values().stream().toList();

Storing a value

To store a key value, the client needs to find the right partition for a given key. Once a partition is found, the request is sent to the cluster node that is hosting that partition.

class Client…

  public void put(String key, String value) throws IOException {
      PartitionInfo partition = findPartition(key);
      sendPutMessage(partition.getPartitionId(), partition.getAddress(), key, value);

  private PartitionInfo findPartition(String key) {
      return partitionTable.getPartitionFor(key);

An example scenario

Let's explore this with another example.Consider three data servers: athens, byzantium and cyrene. The partitions splits are defined as "b" and "d". The three ranges will be created like this:

Key RangeDescription
["", "b")Covers all the names starting from a to b, excluding b
["b", d)Covers all the names starting from b to d, excluding d
["d", "")Covers everything else

The coordinator then creates three partitions for these ranges and maps them to the cluster nodes.

If a client now wants to get all the values for names starting with "a" and "c", it gets all the partitions which have key ranges containing keys starting with "a" and "c". It then sends requests to only those partitions to get the values.

Auto splitting ranges

Often it can be difficult to know what the suitable split points are upfront.In these instances, we can implement auto-splitting.

Here, the coordinator will create only one partition with a key range which includes all the key space.

class ClusterCoordinator…

  private CompletableFuture initializeRangePartitionAssignment(List<String> splits) {
      partitionAssignmentStatus = PartitionAssignmentStatus.IN_PROGRESS;
      PartitionTable partitionTable = splits.isEmpty() ?
      return replicatedLog.propose(new PartitiontableCommand(partitionTable));

  public PartitionTable createPartitionTableWithOneRange() {
      PartitionTable partitionTable = new PartitionTable();
      List<Member> liveMembers = membership.getLiveMembers();
      Member member = liveMembers.get(0);
      Range firstRange = new Range(Range.MIN_KEY, Range.MAX_KEY);
      int partitionId = newPartitionId();
      partitionTable.addPartition(partitionId, new PartitionInfo(partitionId, member.getAddress(), PartitionStatus.ASSIGNED, firstRange));
      return partitionTable;

Each partition can be configured with a fixed maximum size. A background task then runs on each cluster node to track the size of the partitions. When a partition reaches its maximum size, it's split into two partitions, each one being approximately half the size of the original.

class KVStore…

  public void scheduleSplitCheck() {
      }, 1000, 1000, TimeUnit.MILLISECONDS);

  public void splitCheck() {
      for (Integer partitionId : allPartitions.keySet()) {

  int MAX_PARTITION_SIZE = 1000;
  public void splitCheck(Partition partition) {
      String middleKey = partition.getMiddleKeyIfSizeCrossed(MAX_PARTITION_SIZE);
      if (!middleKey.isEmpty()) {
"Partition " + partition.getId() + " reached size " + partition.size() + ". Triggering split");
          network.send(coordLeader, new SplitTriggerMessage(partition.getId(), middleKey, requestNumber++, listenAddress));

Calculating partition size and Finding the middle key

Getting the size of the partition and finding the middle key is dependent on what storage engines are being used. A simple way of dong this can be to just scan through the entire partition to calculate its size. TiKV initially used this approach. To be able to split the tablet, the key which is situated at the mid point needs to be found as well. To avoid scanning through the partition twice, a simple implementation can get the middle key if the size is more than the configured maximum.

class Partition…

  public String getMiddleKeyIfSizeCrossed(int partitionMaxSize) {
      int kvSize = 0;
      for (String key : kv.keySet()) {
          kvSize += key.length() + kv.get(key).length();
          if (kvSize >= partitionMaxSize / 2) {
              return key;
      return "";

The coordinator, handling the split trigger message update the key range metadata for the original partition, and creates a new partition metadata for the split range.

class ClusterCoordinator…

  private void handleSplitTriggerMessage(SplitTriggerMessage message) {"Handling SplitTriggerMessage " + message.getPartitionId() + " split key " + message.getSplitKey());
      splitPartition(message.getPartitionId(), message.getSplitKey());

  public CompletableFuture splitPartition(int partitionId, String splitKey) {"Splitting partition " + partitionId + " at key " + splitKey);
      PartitionInfo parentPartition = partitionTable.getPartition(partitionId);
      Range originalRange = parentPartition.getRange();
      List<Range> splits = originalRange.split(splitKey);
      Range shrunkOriginalRange = splits.get(0);
      Range newRange = splits.get(1);
      return replicatedLog.propose(new SplitPartitionCommand(partitionId, splitKey, shrunkOriginalRange, newRange));

After the partitions metadata is stored successfully, it sends a message to the cluster node that is hosting the parent partition to split the parent partition's data.

class ClusterCoordinator…

  private void applySplitPartitionCommand(SplitPartitionCommand command) {
      PartitionInfo originalPartition = partitionTable.getPartition(command.getOriginalPartitionId());
      Range originalRange = originalPartition.getRange();
      if (!originalRange.coveredBy(command.getUpdatedRange().getStartKey(), command.getNewRange().getEndKey())) {
          logger.error("The original range start and end keys "+ originalRange + " do not match split ranges");

      PartitionInfo newPartitionInfo = new PartitionInfo(newPartitionId(), originalPartition.getAddress(), PartitionStatus.ASSIGNED, command.getNewRange());
      partitionTable.addPartition(newPartitionInfo.getPartitionId(), newPartitionInfo);

      //send requests to cluster nodes if this is the leader node.
      if (isLeader()) {
          var message = new SplitPartitionMessage(command.getOriginalPartitionId(), command.getSplitKey(), newPartitionInfo, requestNumber++, listenAddress);
          scheduler.execute(new RetryableTask(originalPartition.getAddress(), network, this, originalPartition.getPartitionId(), message));

class Range…

  public boolean coveredBy(String startKey, String endKey) {
      return getStartKey().equals(startKey)
              && getEndKey().equals(endKey);

The cluster node splits the original partition and creates a new partition. The data from the original partition is then copied to the new partition. It then responds to the coordinator telling that the split is complete.

class KVStore…

  private void handleSplitPartitionMessage(SplitPartitionMessage splitPartitionMessage) {
              new SplitPartitionResponseMessage(splitPartitionMessage.getPartitionId(),
                      splitPartitionMessage.messageId, listenAddress));

  private void splitPartition(int parentPartitionId, String splitKey, int newPartitionId) {
      Partition partition = allPartitions.get(parentPartitionId);
      Partition splitPartition = partition.splitAt(splitKey, newPartitionId);"Adding new partition " + splitPartition.getId() + " for range " + splitPartition.getRange());
      allPartitions.put(splitPartition.getId(), splitPartition);

class Partition…

  public Partition splitAt(String splitKey, int newPartitionId) {
      List<Range> splits = this.range.split(splitKey);
      Range shrunkOriginalRange = splits.get(0);
      Range splitRange = splits.get(1);

      SortedMap<String, String> partition1Kv =
                      ? kv.headMap(splitKey)
                      : kv.subMap(range.getStartKey(), splitKey);

      SortedMap<String, String> partition2Kv =
                      ? kv.tailMap(splitKey)
                      : kv.subMap(splitKey, range.getEndKey());

      this.kv = partition1Kv;
      this.range = shrunkOriginalRange;

      return new Partition(newPartitionId, partition2Kv, splitRange);

class Range…

  public List<Range> split(String splitKey) {
      return Arrays.asList(new Range(startKey, splitKey), new Range(splitKey, endKey));

Once the coordinator receives the message, it marks the partitions as online

class ClusterCoordinator…

  private void handleSplitPartitionResponse(SplitPartitionResponseMessage message) {
      replicatedLog.propose(new UpdatePartitionStatusCommand(message.getPartitionId(), PartitionStatus.ONLINE));

One of the possible issues that can arise when trying to modify the existing partition is that the client cannot cache and always needs to get the latest partition metadata before it can send any requests to the cluster node. Data stores use Generation Clock for partitions; this is updated every single time a partition is split. Any client requests with an older generation number will be rejected. Clients can then reload the partition table from the coordinator and retry the request. This ensures that clients that possess older metadata don't get the wrong results. YugabyteDB chooses to create two separate new partitions and marks the original as explained in their Automatic table splitting design..

Example Scenario

Consider an example where the cluster node athens holds partition P1 covering the entire key range. The maximum partition size is configured to be 10 bytes. The SplitCheck detects the size has grown beyond 10, and finds the approximate middle key to be bob. It then sends a message to the cluster coordinator, asking it to create metadata for the split partition. Once this metadata has been successfully created by the coordinator, the coordinator then asks athens to split partition P1 and passes it the partitionId from the metadata. Athens can then shrink P1 and create a new partition, copying the data from P1 to the new partition. After the partition has been successfully created it sends confirmation to the coordinator. The coordinator then marks the new partition as online.

Load based splitting

With auto-splitting, we only ever begin with one range. This means all client requests go to a single server even if there are other nodes in the cluster. All requests will continue to go to the single server that is hosting the single range until the range is split and moved to other servers. This is why sometimes splitting on parameters such as total nunmber of requests, or CPU, and memory usage are also used to trigger a partition split. Modern databases like CockroachDB and YugabyteDB support load based plitting. More details can be found in their documentation at [cockroach-load-splitting] and [yb-load-splitting]


Databases like [hbase], CockroachDB, YugabyteDB and TiKV support range partitioning.

Significant Revisions

25 August 2022: Placeholder date