Version Vector

Maintain a list of counters, one per cluster node, to detect concurrent updates

29 June 2021

Unmesh Joshi

Problem

If multiple servers allow the same key to be updated, its important to detect when the values are concurrently updated across a set of replicas.

Solution

Each key value is associated with a version vector that maintains a number for each cluster node.

In essence, a version vector is a set of counters, one for each node. A version vector for three nodes (blue, green, black) would look something like [blue: 43, green: 54, black: 12]. Each time a node has an internal update, it updates its own counter, so an update in the green node would change the vector to [blue: 43, green: 55, black: 12]. Whenever two nodes communicate, they synchronize their vector stamps, allowing them to detect any simultaneous updates.

A typical version vector implementation is as follows:

class VersionVector…

  private final TreeMap<String, Long> versions;

  public VersionVector() {
      this(new TreeMap<>());
  }

  public VersionVector(TreeMap<String, Long> versions) {
      this.versions = versions;
  }

  public VersionVector increment(String nodeId) {
      TreeMap<String, Long> versions = new TreeMap<>();
      versions.putAll(this.versions);
      Long version = versions.get(nodeId);
      if(version == null) {
          version = 1L;
      } else {
          version = version + 1L;
      }
      versions.put(nodeId, version);
      return new VersionVector(versions);
  }

Each value stored on the server is associated with a version vector

class VersionedValue…

  public class VersionedValue {
      String value;
      VersionVector versionVector;
  
      public VersionedValue(String value, VersionVector versionVector) {
          this.value = value;
          this.versionVector = versionVector;
      }
  
      @Override
      public boolean equals(Object o) {
          if (this == o) return true;
          if (o == null || getClass() != o.getClass()) return false;
          VersionedValue that = (VersionedValue) o;
          return Objects.equal(value, that.value) && Objects.equal(versionVector, that.versionVector);
      }
  
      @Override
      public int hashCode() {
          return Objects.hashCode(value, versionVector);
      }

Comparing version vectors

Version vectors are compared by comparing version number for each node. A version vector is considered higher than the other if both of the version vectors have version number for the same cluster nodes and each version number is higher than the one in the other vector and vice versa. If the neither vector has all of the version numbers higher or if they have version numbers for different cluster nodes, they are considered concurrent.

Here are some example comparisons

{blue:2, green:1} is greater than {blue:1, green:1}
{blue:2, green:1} is concurrent with {blue:1, green:2}
{blue:1, green:1, red: 1} is greater than {blue:1, green:1}
{blue:1, green:1, red: 1} is concurrent with {blue:1, green:1, pink: 1}

The comparison is implemented as follows:

public enum Ordering {
    Before,
    After,
    Concurrent
}

class VersionVector…

  //This is exact code for Voldermort implementation of VectorClock comparison.
  //https://github.com/voldemort/voldemort/blob/master/src/java/voldemort/versioning/VectorClockUtils.java
  public static Ordering compare(VersionVector v1, VersionVector v2) {
      if(v1 == null || v2 == null)
          throw new IllegalArgumentException("Can't compare null vector clocks!");
      // We do two checks: v1 <= v2 and v2 <= v1 if both are true then
      boolean v1Bigger = false;
      boolean v2Bigger = false;

      SortedSet<String> v1Nodes = v1.getVersions().navigableKeySet();
      SortedSet<String> v2Nodes = v2.getVersions().navigableKeySet();
      SortedSet<String> commonNodes = getCommonNodes(v1Nodes, v2Nodes);
      // if v1 has more nodes than common nodes
      // v1 has clocks that v2 does not
      if(v1Nodes.size() > commonNodes.size()) {
          v1Bigger = true;
      }
      // if v2 has more nodes than common nodes
      // v2 has clocks that v1 does not
      if(v2Nodes.size() > commonNodes.size()) {
          v2Bigger = true;
      }
      // compare the common parts
      for(String nodeId: commonNodes) {
          // no need to compare more
          if(v1Bigger && v2Bigger) {
              break;
          }
          long v1Version = v1.getVersions().get(nodeId);
          long v2Version = v2.getVersions().get(nodeId);
          if(v1Version > v2Version) {
              v1Bigger = true;
          } else if(v1Version < v2Version) {
              v2Bigger = true;
          }
      }

      /*
       * This is the case where they are equal. Consciously return BEFORE, so
       * that the we would throw back an ObsoleteVersionException for online
       * writes with the same clock.
       */
      if(!v1Bigger && !v2Bigger)
          return Ordering.Before;
          /* This is the case where v1 is a successor clock to v2 */
      else if(v1Bigger && !v2Bigger)
          return Ordering.After;
          /* This is the case where v2 is a successor clock to v1 */
      else if(!v1Bigger && v2Bigger)
          return Ordering.Before;
          /* This is the case where both clocks are parallel to one another */
      else
          return Ordering.Concurrent;
  }

  private static SortedSet<String> getCommonNodes(SortedSet<String> v1Nodes, SortedSet<String> v2Nodes) {
      // get clocks(nodeIds) that both v1 and v2 has
      SortedSet<String> commonNodes = Sets.newTreeSet(v1Nodes);
      commonNodes.retainAll(v2Nodes);
      return commonNodes;
  }


  public boolean descents(VersionVector other) {
      return other.compareTo(this) == Ordering.Before;
  }

Using version vector in a key value store

The version vector can be used in a key value storage as follows. A list of versioned values is needed, as there can be multiple values which are concurrent.

class VersionVectorKVStore…

  public class VersionVectorKVStore {
      Map<String, List<VersionedValue>> kv = new HashMap<>();

When a client wants to store a value, it first reads the latest known version for the given key. It then picks up the cluster node to store the value, based on the key. While storing the value, the client passes back the known version. The request flow is shown in the following diagram. There are two servers named blue and green. For the key "name", blue is the primary server.

In the leader-less replication scheme, the client or a coordinator node picks up the node to write data based on the key. The version vector is updated based on the primary cluster node that the key maps to. A value with the same version vector is copied on the other cluster nodes for replication. If the cluster node mapping to the key is not available, the next node is chosen. The version vector is only incremented for the first cluster node the value is saved to. All the other nodes save the copy of the data. The code for incrementing version vector in databases like [voldemort] looks like this:

class ClusterClient…

  public void put(String key, String value, VersionVector existingVersion) {
      List<Integer> allReplicas = findReplicas(key);
      int nodeIndex = 0;
      List<Exception> failures = new ArrayList<>();
      VersionedValue valueWrittenToPrimary = null;
      for (; nodeIndex < allReplicas.size(); nodeIndex++) {
          try {
              ClusterNode node = clusterNodes.get(nodeIndex);
              //the node which is the primary holder of the key value is responsible for incrementing version number.
              valueWrittenToPrimary = node.putAsPrimary(key, value, existingVersion);
              break;
          } catch (Exception e) {
              //if there is exception writing the value to the node, try other replica.
              failures.add(e);
          }
      }

      if (valueWrittenToPrimary == null) {
          throw new NotEnoughNodesAvailable("No node succeeded in writing the value.", failures);
      }

      //Succeded in writing the first node, copy the same to other nodes.
      nodeIndex++;
      for (; nodeIndex < allReplicas.size(); nodeIndex++) {
          ClusterNode node = clusterNodes.get(nodeIndex);
          node.put(key, valueWrittenToPrimary);
      }
  }

The node acting as a primary is the one which increments the version number.

public VersionedValue putAsPrimary(String key, String value, VersionVector existingVersion) {
    VersionVector newVersion = existingVersion.increment(nodeId);
    VersionedValue versionedValue = new VersionedValue(value, newVersion);
    put(key, versionedValue);
    return versionedValue;
}

public void put(String key, VersionedValue value) {
    versionVectorKvStore.put(key, value);
}

As can be seen in the above code, it is possible for different clients to update the same key on different nodes for instance when a client cannot reach a specific node. This creates a situation where different nodes have different values which are considered 'concurrent' according to their version vector.

As shown in the following diagram, both client1 and client2 are trying to write to the key, "name". If client1 cannot write to server green, the green server will be missing the value written by client1. When client2 tries to write, but fails to connect to server blue, it will write on server green. The version vector for the key "name", will reflect that the servers, blue and green, have concurrent writes.

Figure 2: Concurrent updates on different replicas

Therefore the version vector based storage keeps multiple versions for any key, when the versions are considered concurrent.

class VersionVectorKVStore…

  public void put(String key, VersionedValue newValue) {
      List<VersionedValue> existingValues = kv.get(key);
      if (existingValues == null) {
          existingValues = new ArrayList<>();
      }

      rejectIfOldWrite(key, newValue, existingValues);
      List<VersionedValue> newValues = merge(newValue, existingValues);
      kv.put(key, newValues);
  }

  //If the newValue is older than existing one reject it.
  private void rejectIfOldWrite(String key, VersionedValue newValue, List<VersionedValue> existingValues) {
      for (VersionedValue existingValue : existingValues) {
          if (existingValue.descendsVersion(newValue)) {
              throw new ObsoleteVersionException("Obsolete version for key '" + key
                      + "': " + newValue.versionVector);
          }
      }
  }

  //Merge new value with existing values. Remove values with lower version than the newValue.
  //If the old value is neither before or after (concurrent) with the newValue. It will be preserved
  private List<VersionedValue> merge(VersionedValue newValue, List<VersionedValue> existingValues) {
      List<VersionedValue> retainedValues = removeOlderVersions(newValue, existingValues);
      retainedValues.add(newValue);
      return retainedValues;
  }

  private List<VersionedValue> removeOlderVersions(VersionedValue newValue, List<VersionedValue> existingValues) {
      List<VersionedValue> retainedValues = existingValues
              .stream()
              .filter(v -> !newValue.descendsVersion(v)) //keep versions which are not directly dominated by newValue.
              .collect(Collectors.toList());
      return retainedValues;
  }

If concurrent values are detected while reading from multiple nodes, an error is thrown, allowing the client to do possible conflict resolution.

Resolving conflicts

If multiple versions are returned from different replicas, vector clock comparison can allow the latest value to be detected.

class ClusterClient…

  public List<VersionedValue> get(String key) {
      List<Integer> allReplicas = findReplicas(key);

      List<VersionedValue> allValues = new ArrayList<>();
      for (Integer index : allReplicas) {
          ClusterNode clusterNode = clusterNodes.get(index);
          List<VersionedValue> nodeVersions = clusterNode.get(key);

          allValues.addAll(nodeVersions);
      }

      return latestValuesAcrossReplicas(allValues);
  }

  private List<VersionedValue> latestValuesAcrossReplicas(List<VersionedValue> allValues) {
      List<VersionedValue> uniqueValues = removeDuplicates(allValues);
      return retainOnlyLatestValues(uniqueValues);
  }

  private List<VersionedValue> retainOnlyLatestValues(List<VersionedValue> versionedValues) {
      for (int i = 0; i < versionedValues.size(); i++) {
          VersionedValue v1 = versionedValues.get(i);
          versionedValues.removeAll(getPredecessors(v1, versionedValues));
      }
      return versionedValues;
  }

  private List<VersionedValue> getPredecessors(VersionedValue v1, List<VersionedValue> versionedValues) {
      List<VersionedValue> predecessors = new ArrayList<>();
      for (VersionedValue v2 : versionedValues) {
          if (!v1.sameVersion(v2) && v1.descendsVersion(v2)) {
              predecessors.add(v2);
          }
      }
      return predecessors;
  }

  private List<VersionedValue> removeDuplicates(List<VersionedValue> allValues) {
      return allValues.stream().distinct().collect(Collectors.toList());
  }

Just doing conflict resolution based on version vectors is not enough when there are concurrent updates. So it's important to allow clients to provide application-specific conflict resolvers. A conflict resolver can be provided by the client while reading a value.

public interface ConflictResolver {
    VersionedValue resolve(List<VersionedValue> values);
}

class ClusterClient…

  public VersionedValue getResolvedValue(String key, ConflictResolver resolver) {
      List<VersionedValue> versionedValues = get(key);
      return resolver.resolve(versionedValues);
  }

For example, [riak] allows applications to provide conflict resolvers as explained here.

Last Write Wins (LWW) Conflict Resolution

While the version vector allows detection of concurrent writes across a different set of servers, they do not by themselves provide any help to clients in figuring out which value to choose in case of conflicts. The burden is on the client to do the resolution. Sometimes clients prefer for the key value store to do conflict resolution based on the timestamp. While there are known issues with timestamps across servers, the simplicity of this approach makes it a preferred choice for clients, even with the risk of losing some updates because of issues with timestamps across servers. They rely fully on the services like NTP to be well configured and working across the cluster. Databases like [riak] and [voldemort] allow users to select the 'last write wins' conflict resolution strategy.

To support LWW conflict resolution, a timestamp is stored with each value while its written.

class TimestampedVersionedValue…

  class TimestampedVersionedValue {
      String value;
      VersionVector versionVector;
      long timestamp;
  
      public TimestampedVersionedValue(String value, VersionVector versionVector, long timestamp) {
          this.value = value;
          this.versionVector = versionVector;
          this.timestamp = timestamp;
      }

While reading the value, the client can use the timestamp to pick up the latest value. The version vector is completely ignored in this case.

class ClusterClient…

  public Optional<TimestampedVersionedValue> getWithLWWW(List<TimestampedVersionedValue> values) {
      return values.stream().max(Comparator.comparingLong(v -> v.timestamp));
  }

Read repair

While allowing any cluster node to accept write requests improves availability, it's important that eventually all of the replicas have the same data. One of the common methods to repair replicas happens when the client reads the data.

When the conflicts are resolved, it's also possible to detect which nodes have older versions. The nodes with older versions can be sent the latest versions as part of the read request handling from the client. This is called as read repair.

Consider a scenario shown in the following diagram. Two nodes, blue and green, have values for a key "name". The green node has the latest version with version vector [blue: 1, green:1]. When the values are read from both the replicas, blue and green, they are compared to find out which node is missing the latest version, and a put request with the latest version is sent to the cluster node.

Figure 3: Read repair

Allowing concurrent updates on the same cluster node

There is a possibility of two clients writing concurrently to the same node. In the default implementation shown above, the second write will be rejected. The basic implementation with the version number per cluster node is not enough in this case.

Consider the following scenario. With two clients trying to update the same key, the second client will get an exception, as the version it passes in its put request is stale.

Figure 4: Two clients concurrently updating the same key

A database like [riak] gives flexibility to clients to allow these kind of concurrent writes and prefer not getting error responses.

Using Client IDs instead of Server IDs

If each cluster client can have a unique ID, client ID can be used. A version number is stored per client ID. Every time a client writes a value, it first reads the existing version, increments the number associated with the client ID and writes it to the server.

class ClusterClient…

  private VersionedValue putWithClientId(String clientId, int nodeIndex, String key, String value, VersionVector version) {
      ClusterNode node = clusterNodes.get(nodeIndex);
      VersionVector newVersion = version.increment(clientId);
      VersionedValue versionedValue = new VersionedValue(value, newVersion);
      node.put(key, versionedValue);
      return versionedValue;
  }

Because each client increments its own counter, concurrent writes create sibling values on the servers, but concurrent writes never fail.

The above mentioned scenario, which gives error to second client, works as following:

Figure 5: Two clients concurrently updating the same key

Dotted version vectors

One of the major problems with client ID based version vectors is that the size of the version vector is directly dependent on the number of clients. This causes cluster nodes to accumulate too many concurrent values for a given key over time. The problem is called as sibling explosion. To solve this issue and still allow cluster node based version vectors, [riak] uses a variant of version vector called dotted version vector.

Examples

[voldemort] uses version vector in the way described here. It allows timestamp based last write wins conflict resolution.

[riak] started by using client ID based version vectors, but moved to cluster node based version vectors and eventually to dotted version vectors. Riak also supports last write wins conflict resolution based on the system timestamp.

[cassandra] does not use version vectors, It supports only last write wins conflict resolution based on system timestamp.

Significant Revisions