This pattern is part of Patterns of Distributed Systems

Versioned Value

Store every update to a value with a new version, to allow reading historical values.

22 June 2021

Problem

In a distributed system, nodes need to be able to tell which value for a key is the most recent. Sometimes they need to know past values so they can react properly to changes in a value

Solution

Store a version number with each value. The version number is incremented for every update. This allows every update to be converted to new write without blocking a read. Clients can read historical values at a specific version number.

Consider a simple example of a replicated key value store. The leader of the cluster handles all the writes to the key value store. It saves the write requests in Write-Ahead Log. The Write Ahead Log is replicated using Leader and Followers. The Leader applies entries from the Write Ahead Log at High-Water Mark to the key value store. This is a standard replication method called as state machine replication. Most data systems backed by consensus algorithm like Raft are implemented this way. In this case, the key value store keeps an integer version counter. It increments the version counter every time the key value write command is applied from the Write Ahead Log. It then constructs the new key with the incremented version counter. This way no existing value is updated, but every write request keeps on appending new values to the backing store.

class ReplicatedKVStore…

  int version = 0;
  MVCCStore mvccStore = new MVCCStore();

  @Override
  public CompletableFuture<Response> put(String key, String value) {
      return replicatedLog.propose(new SetValueCommand(key, value));
  }

  private Response applySetValueCommand(SetValueCommand setValueCommand) {
      getLogger().info("Setting key value " + setValueCommand);
      version = version + 1;
      mvccStore.put(new VersionedKey(setValueCommand.getKey(), version), setValueCommand.getValue());
      Response response = Response.success(version);
      return response;
  }

Ordering Of Versioned Keys

Because quickly navigating to the best matching versions is an important implementation concern, the versioned keys are arranged in such a way as to form a natural ordering by using version number as a suffix to the key. This maintains an order that fits well with the underlying data structure. For example, if there are two versions of a key, key1 and key2, key1 will be ordered before key2.

To store the versioned key values, a data structure, such as skip list, that allows quick navigation to the nearest matching versions is used. In Java the mvcc storage can be built as following:

class MVCCStore…

  public class MVCCStore {
      NavigableMap<VersionedKey, String> kv = new ConcurrentSkipListMap<>();
  
      public void put(VersionedKey key, String value) {
          kv.put(key, value);
  
      }

To work with the navigable map, the versioned key is implemented as follows. It implements a comparator to allow natural ordering of keys.

class VersionedKey…

  public class VersionedKey implements Comparable<VersionedKey> {
      private String key;
      private long version;
  
      public VersionedKey(String key, long version) {
          this.key = key;
          this.version = version;
      }
  
      public String getKey() {
          return key;
      }
  
      public long getVersion() {
          return version;
      }
  
      @Override
      public int compareTo(VersionedKey other) {
          int keyCompare = this.key.compareTo(other.key);
          if (keyCompare != 0) {
              return keyCompare;
          }
          return Long.compare(this.version, other.version);
      }
  }

This implementation allows getting values for a specific version using the navigable map API.

class MVCCStore…

  public Optional<String> get(final String key, final int readAt) {
      Map.Entry<VersionedKey, String> entry = kv.floorEntry(new VersionedKey(key, readAt));
      return (entry == null)? Optional.empty(): Optional.of(entry.getValue());
  }

Consider an example where there are four versions of a key stored at version numbers 1, 2, 3 and 5. Depending on the version used by clients to read values, the nearest matching version of the key is returned.

Figure 1: Reading at a specific version

The version at which the specific key value is stored is returned to the client. The client can then use this version to read the values. The overall working is as follows.

Figure 2: Put Request Handling

Figure 3: Reading at a specific version

Reading multiple versions

Sometimes clients need to get all the versions from a given version number. For example, in State Watch the client needs to get all the events from a specific version.

The cluster node can store additional index structures to store all the versions for a key.

class IndexedMVCCStore…

  public class IndexedMVCCStore {
      NavigableMap<String, List<Integer>> keyVersionIndex = new TreeMap<>();
      NavigableMap<VersionedKey, String> kv = new TreeMap<>();
  
      ReadWriteLock rwLock = new ReentrantReadWriteLock();
      int version = 0;
  
      public int put(String key, String value) {
          rwLock.writeLock().lock();
          try {
              version = version + 1;
              kv.put(new VersionedKey(key, version), value);
  
              updateVersionIndex(key, version);
  
              return version;
          } finally {
              rwLock.writeLock().unlock();
          }
      }
  
      private void updateVersionIndex(String key, int newVersion) {
          List<Integer> versions = getVersions(key);
          versions.add(newVersion);
          keyVersionIndex.put(key, versions);
      }
  
      private List<Integer> getVersions(String key) {
          List<Integer> versions = keyVersionIndex.get(key);
          if (versions == null) {
              versions = new ArrayList<>();
              keyVersionIndex.put(key, versions);
          }
          return versions;
      }

Then a client API can be provided to read values from a specific version or for a version range.

class IndexedMVCCStore…

  public List<String> getRange(String key, final int fromRevision, int toRevision) {
      rwLock.readLock().lock();
      try {
          List<Integer> versions = keyVersionIndex.get(key);
          Integer maxRevisionForKey = versions.stream().max(Integer::compareTo).get();
          Integer revisionToRead = maxRevisionForKey > toRevision ? toRevision : maxRevisionForKey;
          SortedMap<VersionedKey, String> versionMap = kv.subMap(new VersionedKey(key, revisionToRead), new VersionedKey(key, toRevision));
          getLogger().info("Available version keys " + versionMap + ". Reading@" + fromRevision + ":" + toRevision);
          return new ArrayList<>(versionMap.values());
          
      } finally {
          rwLock.readLock().unlock();
      }
  }

Care must be taken to use appropriate locking while updating and reading from the index.

There is an alternate implementation possible to save a list of all the versioned values with the key, as used in Gossip Dissemination to avoid unnecessary state exchange.

MVCC and Transaction Isolation

Databases use Versioned Value to implement [mvcc] and [transaction-isolation].

Concurrency Control is about how locking is used when there are multiple concurrent requests accessing the same data. When locks are used to synchronize access, all the other requests are blocked until a request holding the lock is complete and the lock released. With Versioned Value, every write request adds a new record. This allows usage of non-blocking data structures to store the values.

Transaction isolation levels, such as Snapshot Isolation, can be naturally implemented as well. When a client starts reading at a particular version, it's guaranteed to get the same value every time it reads from the database, even if there are concurrent write transactions which commit a different value between multiple read requests.

Figure 4: Reading snapshot

Using RocksDb like storage engines

It is very common to use [rocksdb] or similar embedded storage engines as a storage backend for data stores. For example, [etcd] uses [boltdb], CockroachDB earlier used [rocksdb] and now uses a go-lang clone of RocksDb called [pebble].

These storage engines provide implementation suitable for storing versioned values. They internally use skip lists the same way described in the above section and rely on the ordering of keys. There is a way to provide custom comparator for ordering keys.

class VersionedKeyComparator…

  public class VersionedKeyComparator extends Comparator {
      public VersionedKeyComparator() {
          super(new ComparatorOptions());
      }
  
      @Override
      public String name() {
          return "VersionedKeyComparator";
      }
  
      @Override
      public int compare(Slice s1, Slice s2) {
          VersionedKey key1 = VersionedKey.deserialize(ByteBuffer.wrap(s1.data()));
          VersionedKey key2 = VersionedKey.deserialize(ByteBuffer.wrap(s2.data()));
          return key1.compareTo(key2);
      }
  }

The implementation using [rocksdb] can be done as follows:

class RocksDBMvccStore…

  private final RocksDB db;

  public RocksDBMvccStore(File cacheDir) throws RocksDBException {
      Options options = new Options();
      options.setKeepLogFileNum(30);
      options.setCreateIfMissing(true);
      options.setLogFileTimeToRoll(TimeUnit.DAYS.toSeconds(1));
      options.setComparator(new VersionedKeyComparator());
      db = RocksDB.open(options, cacheDir.getPath());
  }

  public void put(String key, int version, String value) throws RocksDBException {
      VersionedKey versionKey = new VersionedKey(key, version);
      db.put(versionKey.serialize(), value.getBytes());

      Snapshot snapshot = db.getSnapshot();

  }

  public String get(String key, int readAtVersion) {
      RocksIterator rocksIterator = db.newIterator();
      rocksIterator.seekForPrev(new VersionedKey(key, readAtVersion).serialize());
      byte[] valueBytes = rocksIterator.value();
      return new String(valueBytes);
  }

Examples

[etcd3] uses mvcc backend with a single integer representing a version.

MongoDB and CockroachDB use mvcc backend with a hybrid logical clock.

Significant Revisions

22 June 2021: Published