Patterns of Distributed Systems

Distributed systems provide a particular challenge to program. They often require us to have multiple copies of data, which need to keep synchronized. Yet we cannot rely on processing nodes working reliably, and network delays can easily lead to inconsistencies. Despite this, many organizations rely on a range of core distributed software handling data storage, messaging, system management, and compute capability. These systems face common problems which they solve with similar solutions. This article recognizes and develops these solutions as patterns, with which we can build up an understanding of how to better understand, communicate and teach distributed system design.

Unmesh has arranged to publish this material with Pearson, as part of Martin Fowler's Signature Series. He is currently working on preparing this material in book form. Once the book is available online, most of this material will be replaced with references to that online book.

07 September 2022

What this is about

For the last several months, I have been conducting workshops on distributed systems at Thoughtworks. One of the key challenges faced while conducting the workshops was how to map the theory of distributed systems to open source code bases such as Kafka or Cassandra, while keeping the discussions generic enough to cover a broad range of solutions. The concept of patterns provided a nice way out.

Pattern structure, by its very nature, allows us to focus on a specific problem, making it very clear why a particular solution is needed. Then the solution description enables us to give a code structure, which is concrete enough to show the actual solution but generic enough to cover a broad range of variations. This patterns technique also allows us to link various patterns together to build a complete system. This gives a nice vocabulary to discuss distributed system implementations.

What follows is a first set of patterns observed in mainstream open source distributed systems. I hope that this set of patterns will be useful to all developers.

Distributed systems - An implementation perspective

Today's enterprise architecture is full of platforms and frameworks which are distributed by nature. If we see the sample list of frameworks and platforms used in typical enterprise architecture today, it will look something like following:

Type of platform/frameworkExample
DatabasesCassandra, HBase, Riak
Message BrokersKafka, Pulsar
InfrastructureKubernetes, Mesos, Zookeeper, etcd, Consul
In Memory Data/Compute GridsHazelcast, Pivotal Gemfire
Stateful MicroservicesAkka Actors, Axon
File SystemsHDFS, Ceph

All these are 'distributed' by nature. What does it mean for a system to be distributed? There are two aspects:

  • They run on multiple servers. The number of servers in a cluster can vary from as few as three servers to a few thousand servers.
  • They manage data. So these are inherently 'stateful' systems.

There are several ways in which things can go wrong when multiple servers are involved in storing data. All the above mentioned systems need to solve those problems. The implementation of these systems have some recurring solutions to these problems. Understanding these solutions in their general form helps in understanding the implementation of the broad spectrum of these systems and can also serve as a good guidance when new systems need to be built. Enter patterns.


Patterns, a concept introduced by Christopher Alexander, is widely accepted in the software community to document design constructs which are used to build software systems. Patterns provide a structured way of looking at a problem space along with the solutions which are seen multiple times and proven. An interesting way to use patterns is the ability to link several patterns together, in a form of pattern sequence or pattern language, which gives some guidance of implementing a ‘whole’ or a complete system. Looking at distributed systems as a series of patterns is a useful way to gain insights into their implementation.

Problems and Their Recurring Solutions.

Several things can go wrong when data is stored on multiple servers.

Process crashes

Processes can crash at any time maybe due to hardware faults or software faults. There are numerous ways in which a process can crash.

  • It can be taken down for routine maintenance by system administrators.
  • It can be killed doing some file IO because the disk is full and the exception is not properly handled.
  • In cloud environments, it can be even trickier, as some unrelated events can bring the servers down.

The bottom line is that if the processes are responsible for storing data, they must be designed to give a durability guarantee for the data stored on the servers. Even if a process crashes abruptly, it should preserve all the data for which it has notified the user that it's stored successfully. Depending on the access patterns, different storage engines have different storage structures, ranging from a simple hash map to a sophisticated graph storage. Because flushing data to the disk is one of the most time consuming operations, not every insert or update to the storage can be flushed to disk. So most databases have in-memory storage structures which are only periodically flushed to disk. This poses a risk of losing all the data if the process abruptly crashes.

A technique called Write-Ahead Log is used to tackle this situation. Servers store each state change as a command in an append-only file on a hard disk. Appending a file is generally a very fast operation, so it can be done without impacting performance. A single log, which is appended sequentially, is used to store each update. At the server startup, the log can be replayed to build in memory state again.

This gives a durability guarantee. The data will not get lost even if the server abruptly crashes and then restarts. But clients will not be able to get or store any data till the server is back up. So we lack availability in the case of server failure.

One of the obvious solutions is to store the data on multiple servers. So we can replicate the write ahead log on multiple servers.

When multiple servers are involved, there are a lot more failure scenarios which need to be considered.

Network delays

In the TCP/IP protocol stack, there is no upper bound on delays caused in transmitting messages across a network. It can vary based on the load on the network. For example, a 1 Gbps network link can get flooded with a big data job that's triggered, filling the network buffers, which can cause arbitrary delay for some messages to reach the servers.

In a typical data center, servers are packed together in racks, and there are multiple racks connected by a top-of-the-rack switch. There might be a tree of switches connecting one part of the data center to the other. It is possible in some cases, that a set of servers can communicate with each other, but are disconnected from another set of servers. This situation is called a network partition. One of the fundamental issues with servers communicating over a network then is how to know a particular server has failed.

There are two problems to be tackled here.

  • A particular server can not wait indefinitely to know if another server has crashed.
  • There should not be two sets of servers, each considering another set to have failed, and therefore continuing to serve different sets of clients. This is called the split brain.

To tackle the first problem, every server sends a HeartBeat message to other servers at a regular interval. If a heartbeat is missed, the server sending the heartbeat is considered crashed. The heartbeat interval is small enough to make sure that it does not take a lot of time to detect server failure. As we will see below, in the worst case scenario, the server might be up and running, but the cluster as a group can move ahead considering the server to be failing. This makes sure that services provided to clients are not interrupted.

The second problem is the split brain. With the split brain, if two sets of servers accept updates independently, different clients can get and set different data, and once the split brain is resolved, it's impossible to resolve conflicts automatically.

To take care of the split brain issue, we must ensure that the two sets of servers, which are disconnected from each other, should not be able to make progress independently. To ensure this, every action the server takes, is considered successful only if the majority of the servers can confirm the action. If servers can not get a majority, they will not be able to provide the required services, and some group of the clients might not be receiving the service, but servers in the cluster will always be in a consistent state. The number of servers making the majority is called a Quorum. How to decide on the quorum? That is decided based on the number of failures the cluster can tolerate. So if we have a cluster of five nodes, we need a quorum of three. In general, if we want to tolerate f failures we need a cluster size of 2f + 1.

Quorum makes sure that we have enough copies of data to survive some server failures. But it is not enough to give strong consistency guarantees to clients. Lets say a client initiates a write operation on the quorum, but the write operation succeeds only on one server. The other servers in the quorum still have old values. When a client reads the values from the quorum, it might get the latest value, if the server having the latest value is available. But it can very well get an old value if, just when the client starts reading the value, the server with the latest value is not available. To avoid such situations, someone needs to track if the quorum agrees on a particular operation and only send values to clients which are guaranteed to be available on all the servers. Leader and Followers is used in this situation. One of the servers is elected a leader and the other servers act as followers. The leader controls and coordinates the replication on the followers. The leader now needs to decide, which changes should be made visible to the clients. A High-Water Mark is used to track the entry in the write ahead log that is known to have successfully replicated to a quorum of followers. All the entries upto the high-water mark are made visible to the clients. The leader also propagates the high-water mark to the followers. So in case the leader fails and one of the followers becomes the new leader, there are no inconsistencies in what a client sees.

Process Pauses

Even with quorums and leader and followers, there is a tricky problem that needs to be solved. Leader processes can pause arbitrarily. There are a lot of reasons a process can pause. For languages which support garbage collection, there can be a long garbage collection pause. A leader with a long garbage collection pause, can be disconnected from the followers, and will continue sending messages to followers after the pause is over. In the meanwhile, because followers did not receive a heartbeat from the leader, they might have elected a new leader and accepted updates from the clients. If the requests from the old leader are processed as is, they might overwrite some of the updates. So we need a mechanism to detect requests from out-of-date leaders. Here Generation Clock is used to mark and detect requests from older leaders. The generation is a number which is monotonically increasing.

Unsynchronized Clocks and Ordering Events

The problem of detecting older leader messages from newer ones is the problem of maintaining ordering of messages. It might appear that we can use system timestamps to order a set of messages, but we can not. The main reason we can not use system clocks is that system clocks across servers are not guaranteed to be synchronized. A time-of-the-day clock in a computer is managed by a quartz crystal and measures time based on the oscillations of the crystal.

This mechanism is error prone, as the crystals can oscillate faster or slower and so different servers can have very different times. The clocks across a set of servers are synchronized by a service called NTP. This service periodically checks a set of global time servers, and adjusts the computer clock accordingly.

Because this happens with communication over a network, and network delays can vary as discussed in the above sections, the clock synchronization might be delayed because of a network issue. This can cause server clocks to drift away from each other, and after the NTP sync happens, even move back in time. Because of these issues with computer clocks, time of day is generally not used for ordering events. Instead a simple technique called Lamport Clock is used. Generation Clock is an example of that. Lamport Clocks are just simple numbers, which are incremented only when some event happens in the system. In a database, the events are about writing and reading the values, so the lamport clock is incremented only when a value is written. The Lamport Clock numbers are also passed in the messages sent to other processes. The receiving process can then select the larger of the two numbers, the one it receives in the message and the one it maintains. This way Lamport Clocks also track happens-before relationship between events across processes which communicate with each other. An example of this is the servers taking part in a transaction. While the Lamport Clock allows ordering of events, it does not have any relation to the time of the day clock. To bridge this gap, a variation called Hybrid Clock is used. The Hybrid Clock uses system time along with a separate number to make sure the value increases monotonically, and can be used the same way as Lamport Clock.

The Lamport Clock allows determining the order of events across a set of communicating servers. But it does not allow detecting concurrent updates to the same value happening across a set of replicas. Version Vector is used to detect conflict across a set of replicas.

The Lamport Clock or Version Vector needs to be associated with the stored values, to detect which values are stored after the other or if there are conflicts. So the servers store the values as Versioned Value.

Putting it all together - Pattern Sequences

We can see how understanding these patterns, helps us build a complete system, from the ground up. We will take consensus implementation as an example.

Fault Tolerant Consensus

Distributed Consensus is a special case of distributed system implementation, which provides the strongest consistency guarantee. Common examples seen in popular enterprise systems include, Zookeeper, etcd and Consul. They implement consensus algorithms such as zab and Raft to provide replication and strong consistency. There are other popular algorithms to implement consensus; multi-paxos which is used in Google's Chubby locking service, view stamp replication and virtual-synchrony. In very simple terms, Consensus refers to a set of servers which agree on stored data, the order in which the data is stored and when to make that data visible to the clients.

Assuming a crash fault model, where it is assumed that cluster nodes stop working and crash when any fault happens, the basic technique to implement consensus for a single value, is implemented as Paxos. Paxos describes a few simple rules to use two phase execution, Quorum and Generation Clock to achieve consensus across a set of cluster nodes even when there are process crashes, network delays and unsynchronized clocks.

When data is replicated across cluster nodes, achieving consensus on a single value is not enough. All the replicas need to reach agreement on all the data. This requires executing Paxos multiple times while maintaining strict order. Replicated Log describes how basic Paxos can be extended to achieve this.

This technique is also known as state machine replication to achieve fault tolerance. In state machine replication, the storage services, like a key value store, are replicated on all the servers, and the user inputs are executed in the same order on each server. The key implementation technique used to achieve this is to replicate Write-Ahead Log on all the servers to have a Replicated Log.

Pattern Sequence for implementing replicated log

We can put the patterns together to implement Replicated Wal as follows.

To provide durability guarantees, you can use the Write-Ahead Log pattern. The Write Ahead Log is divided into multiple segments using Segmented Log. This helps with log cleaning, which is handled by Low-Water Mark. Fault tolerance is provided by replicating the write-ahead log on multiple servers. The replication among the servers is managed using the Leader and Followers pattern and Quorum is used to update the High-Water Mark to decide which values are visible to clients. All the requests are processed in strict order, by using Singular Update Queue. The order is maintained while sending the requests from leaders to followers using Single Socket Channel. To optimize for throughput and latency over a single socket channel, Request Pipeline can be used. Followers know about availability of the leader via the HeartBeat received from the leader. If the leader is temporarily disconnected from the cluster because of network partition, it is detected by using Generation Clock. If all the requests are served only by the leader, it might get overloaded. When the clients are read only and tolerate reading stale values, they can be served by the follower servers. Follower Reads allows handling read requests from follower servers.

Atomic Commit

Consensus algorithms are useful when multiple cluster nodes all store the same data. Often, data size is too big to store and process on a single node. So data is partitioned across a set of nodes using various partitioning schemes such as Fixed Partitions or Key-Range Partitions. To achieve fault tolerance, each partition is also replicated across a few cluster nodes using Replicated Log.

Sometimes data across a set of partitions needs to be stored as one atomic operation. If processes storing a partition crash or if there are network delays or process pauses, it might happen that data is copied on a few partitions and failed on a few. To maintain atomicity, the data needs to be stored and made accessible on all the partitions or none of them. Two Phase Commit is used to guarantee atomicity across a set of partitions. To guarantee atomicity, two-phase-commit often needs to lock the data items involved. This can severely impact throughput, particularly when there are long running read-only operations holding locks. To allow better throughput without using conflicting locks, two-phase-commit implementations often use Versioned Value based storage.

Kubernetes or Kafka Control Plane

Products like Kubernetes or Kafka's architecture are built around a strongly consistent metadata store. We can understand it as a pattern sequence. Consistent Core is used as a strongly consistent, fault tolerant metadata store. Lease is used to implement group membership and failure detection of cluster nodes. Cluster nodes use State Watch to get notified when any cluster node fails or updates its metadata The Consistent Core implementation uses Idempotent Receiver to ignore duplicate requests sent by cluster nodes in case of retries on network failure. The Consistent Core is built with a 'Replicated Wal', which is described as a pattern sequence in the above section.

Logical Timestamp usage

Usage of various types of logical timestamps can also be seen as a pattern sequence. Various products use either a Gossip Dissemination or a Consistent Core for group membership and failure detection of cluster nodes. The data storage uses Versioned Value to be able to determine which values are most recent. If a single server is responsible for updating the values or Leader and Followers is used, then a Lamport Clock can be used as a version, in the Versioned Value. When the timestamp values need to be derived from the time of the day, a Hybrid Clock is used instead of a simple Lamport Clock. If multiple servers are allowed to handle client requests to update the same value, a Version Vector is used to be able to detect concurrent writes on different cluster nodes.

This way, understanding problems and their recurring solutions in their general form, helps in understanding building blocks of a complete system

Next Steps

Distributed systems is a vast topic. The set of patterns covered here is a small part, covering different categories to showcase how a patterns approach can help understand and design distributed systems. I will keep adding to this set to broadly include the following categories of problems solved in any distributed system

  • Group Membership and Failure Detection
  • Partitioning
  • Replication and Consistency
  • Storage
  • Processing


Many thanks to Martin Fowler for helping me throughout and guiding me to think in terms of patterns.

Mushtaq Ahemad helped me with good feedback and a lot of discussions throughout

Rebecca Parsons, Dave Elliman, Samir Seth, Prasanna Pendse, Santosh Mahale, Sarthak Makhija, James Lewis, Chris Ford, Kumar Sankara Iyer, Evan Bottcher,Ian Cartwright, Priyanka Kotwal provided feedback on the earlier drafts.

Professor Indranil Gupta provided feedback on the gossip dissemination pattern.

Dahlia Malkhi helped with questions about google spanner

CockcroachDB team was very responsive in answering questions about their design choices

Thanks to Mikhail Bautin and Karthik Ranganathan from Yugabyte team for answering all the questions about safe time implementation in YugabyteDB

Thanks to Bela Ban, Patrik Nordwall and Lalith Suresh for the feedback on the emergent leader pattern

Thanks to Jojo Swords, Gareth Morgan and Richard Gall for helping with copy editing.

Significant Revisions

07 September 2022: Request Waiting List

06 September 2022: Request Batch

25 August 2022: Key-Range Partitions Placeholder date

23 August 2022: Fixed Partitions

18 August 2022: Emergent Leader

17 August 2022: Clock-Bound Wait Publisheddate

18 January 2022: Two Phase Commit

11 January 2022: Replicated Log

05 January 2022: Paxos

01 July 2021: Follower Reads

29 June 2021: Version Vector Published

24 June 2021: Hybrid Clock Published

23 June 2021: Lamport Clock Published

22 June 2021: Versioned Value Published

17 June 2021: Gossip Dissemination Published

17 June 2021: Patterns of Distributed Systems Started publication of third batch of patterns based on Gossip.

26 January 2021: Idempotent Receiver Published

19 January 2021: State Watch Published

13 January 2021: Lease

05 January 2021: Patterns of Distributed Systems Started publication of second batch with: Consistent Core, Lease, State Watch, and Idempotent Receiver.

05 January 2021: Consistent Core Published

25 August 2020: Singular Update Queue Published

20 August 2020: Request Pipeline Published

19 August 2020: Single Socket Channel Published

18 August 2020: Low-Water Mark Published

13 August 2020: Segmented Log Published

12 August 2020: Write-Ahead Log Published

11 August 2020: Quorum First published

06 August 2020: Leader and Followers

05 August 2020: High-Water Mark

04 August 2020: HeartBeat Initial publication

04 August 2020: Patterns of Distributed Systems Initial publication with Generation Clock and Heartbeat patterns. Some patterns then added over next few weeks

04 August 2020: Generation Clock Initial publication