Hybrid Clock

Use a combination of system timestamp and logical timestamp to have versions as date-time, which can be ordered

24 June 2021

Unmesh Joshi

Problem

When Lamport Clock is used as a version in Versioned Value, clients do not know the actual date-time when the particular versions are stored. It's useful for clients to access versions using date-time like 01-01-2020 instead of using integers like 1, 2, 3.

Solution

Hybrid Logical Clock provides a way to have a version which is monotonically increasing just like a simple integer, but also has relation with the actual date time. Hybrid clocks are used in practice by databases like mongodb or cockroachdb.

A Hybrid Logical Clock is implemented as follows:

class HybridClock…

  public class HybridClock {
      private final SystemClock systemClock;
      private HybridTimestamp latestTime;
      public HybridClock(SystemClock systemClock) {
          this.systemClock = systemClock;
          this.latestTime = new HybridTimestamp(systemClock.currentTimeMillis(), 0);
      }

It maintains the latest time as an instance of the hybrid timestamp, which is constructed by using system time and an integer counter.

class HybridTimestamp…

  public class HybridTimestamp implements Comparable<HybridTimestamp> {
      private final long wallClockTime;
      private final int ticks;
  
      public HybridTimestamp(long systemTime, int ticks) {
          this.wallClockTime = systemTime;
          this.ticks = ticks;
      }
  
      public static HybridTimestamp fromSystemTime(long systemTime) {
          return new HybridTimestamp(systemTime, -1); //initializing with -1 so that addTicks resets it to 0
      }
  
      public HybridTimestamp max(HybridTimestamp other) {
          if (this.getWallClockTime() == other.getWallClockTime()) {
              return this.getTicks() > other.getTicks()? this:other;
          }
          return this.getWallClockTime() > other.getWallClockTime()?this:other;
      }
  
      public long getWallClockTime() {
          return wallClockTime;
      }
  
      public HybridTimestamp addTicks(int ticks) {
          return new HybridTimestamp(wallClockTime, this.ticks + ticks);
      }
  
      public int getTicks() {
          return ticks;
      }
  
      @Override
      public int compareTo(HybridTimestamp other) {
          if (this.wallClockTime == other.wallClockTime) {
              return Integer.compare(this.ticks, other.ticks);
          }
          return Long.compare(this.wallClockTime, other.wallClockTime);
      }

Hybrid clocks can be used exactly the same way as the Lamport Clock versions. Every server holds an instance of a hybrid clock.

class Server…

  HybridClockMVCCStore mvccStore;
  HybridClock clock;

  public Server(HybridClockMVCCStore mvccStore) {
      this.clock = new HybridClock(new SystemClock());
      this.mvccStore = mvccStore;
  }

Every time a value is written, a hybrid timestamp is associated with it. The trick is to check if the system time value is going back in time, if so increment another number representing a logical part of the component to reflect clock progress.

class HybridClock…

  public synchronized HybridTimestamp now() {
      long currentTimeMillis = systemClock.currentTimeMillis();
      if (latestTime.getWallClockTime() >= currentTimeMillis) {
           latestTime = latestTime.addTicks(1);
      } else {
          latestTime = new HybridTimestamp(currentTimeMillis, 0);
      }
      return latestTime;
  }

Every write request that a server receives from the client carries a timestamp. The receiving server compares its own timestamp to that of the request and sets it's own timestamp to the higher of the two

class Server…

  public HybridTimestamp write(String key, String value, HybridTimestamp requestTimestamp) {
      //update own clock to reflect causality
      HybridTimestamp writeAtTimestamp = clock.tick(requestTimestamp);
      mvccStore.put(key, writeAtTimestamp, value);
      return writeAtTimestamp;
  }

class HybridClock…

  public synchronized HybridTimestamp tick(HybridTimestamp requestTime) {
      long nowMillis = systemClock.currentTimeMillis();
      //set ticks to -1, so that, if this is the max, the next addTicks reset it to zero.
      HybridTimestamp now = HybridTimestamp.fromSystemTime(nowMillis);
      latestTime = max(now, requestTime, latestTime);
      latestTime = latestTime.addTicks(1);
      return latestTime;
  }

  private HybridTimestamp max(HybridTimestamp ...times) {
      HybridTimestamp maxTime = times[0];
      for (int i = 1; i < times.length; i++) {
          maxTime = maxTime.max(times[i]);
      }
      return maxTime;
  }

The timestamp used for writing the value is returned to the client. The requesting client updates its own timestamp and then uses this timestamp to issue any further writes.

class Client…

  HybridClock clock = new HybridClock(new SystemClock());
  public void write() {
      HybridTimestamp server1WrittenAt = server1.write("key1", "value1", clock.now());
      clock.tick(server1WrittenAt);

      HybridTimestamp server2WrittenAt = server2.write("key2", "value2", clock.now());

      assertTrue(server2WrittenAt.compareTo(server1WrittenAt) > 0);
  }

Multiversion storage with Hybrid Clock

A hybrid timestamp can be used as a version when the value is stored in a key value store. The values are stored as discussed in Versioned Value.

class HybridClockReplicatedKVStore…

  private Response applySetValueCommand(VersionedSetValueCommand setValueCommand) {
      mvccStore.put(setValueCommand.getKey(), setValueCommand.timestamp, setValueCommand.value);
      Response response = Response.success(setValueCommand.timestamp);
      return response;
  }

class HybridClockMVCCStore…

  ConcurrentSkipListMap<HybridClockKey, String> kv = new ConcurrentSkipListMap<>();

  public void put(String key, HybridTimestamp version, String value) {
      kv.put(new HybridClockKey(key, version), value);
  }

class HybridClockKey…

  public class HybridClockKey implements Comparable<HybridClockKey> {
      private String key;
      private HybridTimestamp version;
  
      public HybridClockKey(String key, HybridTimestamp version) {
          this.key = key;
          this.version = version;
      }
  
      public String getKey() {
          return key;
      }
  
      public HybridTimestamp getVersion() {
          return version;
      }
  
      @Override
      public int compareTo(HybridClockKey o) {
          int keyCompare = this.key.compareTo(o.key);
          if (keyCompare == 0) {
              return this.version.compareTo(o.version);
          }
          return keyCompare;
      }

The values are read exactly as discussed in the Ordering of the versioned keys. The versioned keys are arranged in such a way as to form a natural ordering by using hybrid timestamps as a suffix to the key. This implementation enables us to get values for a specific version using the navigable map API.

class HybridClockMVCCStore…

  public Optional<String> get(String key, HybridTimestamp atTimestamp) {
      Map.Entry<HybridClockKey, String> versionKeys = kv.floorEntry(new HybridClockKey(key, atTimestamp));
      getLogger().info("Available version keys " + versionKeys + ". Reading@" + versionKeys);
      return (versionKeys == null)? Optional.empty(): Optional.of(versionKeys.getValue());
  }

Converting hybrid timestamp to date time

A hybrid clock can be converted into an actual timestamp value by merging the system timestamp and the logical clicks number. As discussed in the [hybrid-clock] paper, the first 48 bits of the system time can be preserved, and lower order 16 bits can be replaced by the logical counter.

class HybridTimestamp…

  
      public LocalDateTime toDateTime() {
          return LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis()), ZoneOffset.UTC);
      }
  
      public long epochMillis() {
          //Compact timestamp as discussed in https://cse.buffalo.edu/tech-reports/2014-04.pdf.
          return (wallClockTime >> 16 << 16) | (ticks << 48 >> 48);
      }

Assigning timestamp to distributed transactions

Databases like [mongodb] and [cockroachdb] use a Hybrid Clock to maintain causality with distributed transactions. With distributed transactions, it's important to note that, all the values stored as part of the transaction should be stored at the same timestamp across the servers when the transaction commits. The requesting server might know about a higher timestamp in the later write requests. So the requesting server communicates with all the participating servers about the highest timestamp it received when the transaction commits. This fits very well with the standard [two-phase-commit] protocol for implementing transactions.

Following is the example of how the highest timestamp is determined at transaction commit. Assume that there are three servers. Server Blue stores names and Server Green stores titles. There is a separate server which acts as a coordinator. As can be seen, each server has a different local clock value. This can be a single integer or hybrid clock. The server acting as a coordinator starts writing to server Blue with the clock value known to it, which is 1. But Blue's clock is at 2, so it increments that and writes the value at timestamp 3. Timestamp 3 is returned to the coordinator in the response. For all subsequent requests to other servers, the coordinator uses timestamp of 3. Server Green, receiving the timestamp value 3 in the request, but it's clock is at 4. So it picks up the highest value, which is 4. Increments it and writes the value at 5 and returns timestamp 5 to the coordinator. When the transaction commits, the coordinator uses the highest timestamp it received to commit the transaction. All the values updated in the transaction will be stored at this highest timestamp.

Figure 1: Propagating commit timestamp across servers

A very simplified code for timestamp handling with transactions looks like this:

class TransactionCoordinator…

  public Transaction beginTransaction() {
      return new Transaction(UUID.randomUUID().toString());
  }

  public void putTransactionally() {
      Transaction txn = beginTransaction();
      HybridTimestamp coordinatorTime = new HybridTimestamp(1);
      HybridTimestamp server1WriteTime
              = server1.write("name", "Alice", coordinatorTime, txn);

      HybridTimestamp server2WriteTime = server2.write("title", "Microservices", server1WriteTime, txn);

      HybridTimestamp commitTimestamp = server1WriteTime.max(server2WriteTime);
      commit(txn, commitTimestamp);
  }

  private void commit(Transaction txn, HybridTimestamp commitTimestamp) {
      server1.commitTxn("name", commitTimestamp, txn);
      server2.commitTxn("title", commitTimestamp, txn);
  }

Transaction implementation can also use the prepare phase of the two phase commit protocol to learn about the highest timestamp used by each participating server.

Examples

[mongodb] uses hybrid timestamp to maintain versions in its MVCC storage.

[cockroachdb] uses hybrid timestamp to maintain causality with distributed transactions.

Significant Revisions