This article is a draft.
Please do not share or link to this URL until I remove this notice

Patterns of Distributed Systems

10 February 2020

Distributed Systems - An implementation perspective

Today's enterprise architecture is full of platforms and frameworks which are distributed by nature. If we see the sample list of frameworks and platforms used in typical enterprise architecture today, it will look something like following

Type of platform/frameworkExample
Databases Cassandra, HBase, Riak
Message Brokers Kafka, Pulsar
Infrastructure Kubernetes, Mesos, Zookeeper, etcd, Consul
In Memory Data/Compute Grids Hazzlecast, Pivotal Gemfire
Stateful Microservices Akka Actors, Axon
File Systems HDFS, Ceph
All these are 'distributed' by nature. What does it mean for a system to be distributed? There are two aspects,
  • They run on multiple servers. The number of servers in a cluster can vary from as less as three servers to few thousand servers.
  • They manage data. So these are inherently 'stateful' systems.

There are several ways in which things can go wrong when multiple servers are involved in storing data. All the above mentioned systems need to solve those problems. The implementation of these systems have some recurring solutions to these problems. Understanding these solutions in their general form, helps understanding the implementation of the broad spectrum of these systems and can also serve as a good guidance when new systems need to be built. Enter patterns.


Patterns , a concept introduced by Christopher Alexander is widely accepted in software community to document design constructs which are used to build software systems. Patterns provide a good structured way of looking at a problem space with the solutions which are seen multiple times and proven. An interesting way to use patterns is the ability to link several patterns together, in a form of pattern sequence or pattern language which gives some guidance of implementing a ‘whole’ or a complete system. Looking at distributed systems as a series of patterns is a useful way to gain insights into their implementation.

Problems and Their Recurring Solutions.

There are several things which can go wrong when data is stored on multiple servers.

Process crashes

Processes can crash at any time. Either due to hardware faults or software faults. There are numerous ways in which a process can crash.

  • It can be taken down for routine maintenance by system administrators.
  • It can be killed doing some file IO because the disk is full and the exception is not properly handled.
  • In cloud environments, it can go even trickier, as some unrelated events can bring the servers on which our instances down.
The bottom line is that if the processes are responsible for storing data, they must be designed to give a durability guarantee for the data stored on the servers. Even if a process crashes abruptly, it should preserve all the data for which it has notified the user that it's stored successfully. Depending on the access patterns, different storage engines have different storage structures. Ranging from a simple hash map to a sophisticated graph storage. Because flushing data to the disk is one of the most time consuming operations, every insert or update to the storage can not be flushed to disk. So most databases have in-memory storage structures which are only periodically flushed to disk. This poses a risk of losing all the data if the process abruptly crashes.

A technique called Write-Ahead Log is used to tackle this situation. Servers store each state change as a command in a append only file on a hard disk. Appending a file is generally a very fast operation, so it can done without impacting performance. A single log which is appended sequentially is used to store each update. At the server startup, the log can be replayed to build in memory state again.

This gives a durability guarantee. The data will not get lost even if the server abruptly crashes, and then restarts. But clients will not be able to get or store any data till the server is back up. So we lack availability in the case of server failure.

One of the obvious solutions is to store the data on multiple servers. So we can replicate the write ahead log on multiple servers.

When multiple servers are involved, there are a lot more failure scenarios which need to be considered.

Network delays

In TCP/IP protocol stack, there is no upper bound on delays caused in transmitting messages across network. It can vary based on the load on the network. For example, a 1 Gbps network link can get flooded with a big data job that's triggered, filling the network buffers, and can cause arbitrary delay for some messages to reach the servers. In a typical data center, servers are packed together in racks, and there are multiple racks connected by top of the rack switch. There might be a tree of switches connecting one part of the datacenter to the other. It is possible in some cases, that a set of servers can communicate with each other, but are disconnected from another set of servers. This situation is called a network partition. One of the fundamental issues with servers communicating over a network then is, when to say a particular server has failed?.

There are two issues to be tackled here.

  • A particular server can not wait indefinitely to know if another server is crashed.
  • There should not be two sets of servers, each considering another set to be failed, and continue serving different sets of clients. This is called the split brain issue.

To tackle the first issue, every server sends a HeartBeat message to other servers at a regular interval. If a heartbeat is missed, the server sending the heartbeat is considered crashed. The heartbeat interval is small enough to make sure that it does not take a lot of time to detect server failure. As we will see below, in the worst case scenario, the server might be up and running, but the cluster as a group can move ahead considering the server to be failing. This makes sure that services provided to clients are not interrupted.

The second issue is that of the split brain. The biggest problem with the split brain is that if two sets of servers accept updates independently, different clients can get and set different data, and once the split brain is resolved, it's impossible to resolve conflicts automatically.

To take care of the split brain issue, it has to be ensured that the two sets of servers, which are disconnected from each other, should not be able to make progress independently. To ensure this, every action the server takes, is considered successful only if the majority of the servers can confirm. If servers can not get majority, they will not be able to provide the required services, and some group of the clients might not be receiving the service, but servers in the cluster will always be in a consistent state. The number of servers making majority is is called a Quorum How to decide on the quorum? That is decided based on the number of failures the cluster can tolerate. So if we have a cluster of five nodes, we need a quorum of three. In general, if we want to tolerate f failures we need a cluster size of 2f + 1

Quorum makes sure that we have enough copies of data to survive some server failures. But it is not enough to give strong consistency guarantees to clients. Lets say a client initiates a write operation on the quorum, but the write operation succeeds only on one server. The other servers in the quorum still have old values. When a client reads the values from the quorum, it might get the latest value, if the server having the latest value is available. But it can very well get an old value if just when the client starts reading the value, the server with the latest value is not available. To avoid such situations, someone needs to track if the quorum agrees on a particular operation. And only send values to clients which are guaranteed to be available on all the servers. Leader and Followers is used in this situation. One of the servers is elected a leader and the other servers act as followers. The leader controls and coordinates the replication on the followers. The leader now needs to decide, which changes should be made visible to the clients. High-Water Mark is used to track the entry in the write ahead log that is known to have successfully replicated to a Quorum of followers. All the entries upto high-water mark are made visible to the clients. The leader also propagates the high-water mark to the followers. So in case the leader fails and one of the followers becomes the new leader, there are no inconsistencies in what a client sees.

Process Pauses

But this is not all, even with Quorums and Leader followers, there is a tricky problem that needs to be solved. Leader processes can pause arbitrarily. There are a lot of reasons a process can pause. For languages which support garbage collection, there can be a long garbage collection pause. A leader with a long garbage collection pause, can be disconnected from the followers, and will continue sending messages to followers after the pause is over. In the meanwhile, because followers did not receive any heartbeat from the leader, they might have elected a new leader and accepted updates from the clients. If the requests from the old leader are processed as it is, they might overwrite some of the updates. So we need a mechanism to detect requests from out of date leaders. Generation Clock is used to mark and detect requests from older leaders. The generation is a number which is monotonically increasing.

Unsynchronized Clocks and Ordering Events

The problem of detecting older leader messages from newer ones is the problem of maintaining ordering of messages. It might appear that we can use system timestamps to order a set of messages, but we can not. The main reason we can not use system clocks is that system clocks across servers are not guaranteed to be synchronized. A time of the day clock in a computer is managed by a quartz crystal and measures time based on the oscillations of the crystal.

This mechanism is error prone, as the clusters can oscillate faster or slower and so different servers can have very different times. The clocks across a set of servers are synchronized by a service called NTP. This service periodically checks a set of global time servers, and adjusts the computer clock accordingly. Because this happens with network communication over a network, and network delays can vary as discussed in the above sections, the clock synchronization might be delayed because of a network issue. This can cause server clocks to drift away from each other, and after the NTP sync happens, even move back in time. Because of these issues with computer clocks, time of day is generally not used for ordering events. Instead a simple technique called Lamport’s timestamp is used. Generation Clock is an example of that.

Putting it all together - An example distributed system

We can see how understanding these patterns, helps us building a complete system grounds up. We will take consensus implementation as an example. Distributed Consensus is a special case of distributed system implementation, which provides the strongest consistency guarantee. Common examples seen in popular enterprise systems are, [zookeeper], [etcd] and [consul]. They implement consensus algorithms like [zab] and [raft] to provide replication and strong consistency. There are other popular algorithms to implement consensus, [paxos] which is used in Google's [chubby] locking service, view stamp replication and [virtual-synchrony]. In very simple terms, Consensus refers to a set of servers which agree on stored data, the order in which the data is stored and when to make that data visible to the clients.

Pattern Sequence for implementing consensus

We can put the patterns together to have an implementation like following. For providing durability guarantees use Write-Ahead Log. WAL is divided into multiple segments using Segmented Log. This helps with log cleaning which is handled by LowWaterMark. Fault tolerance is provided by replicating the wal on multiple servers. Replication amongst the servers is managed by using Leader and Followers. Quorum is used to update High-Water Mark to decide which values are visible to clients. All the requests are processed in strict order, by using Singular Update Queue. The order is maintained while sending the requests from leaders to followers using Single Socket Channel. Followers know about availability of leader by HeartBeat received from the leader. If leader is temporarily disconnected from the cluster because of network partition, it is detected by using Generation Clock

Significant Revisions

10 February 2020: First in toolchain