Patterns of Distributed Systems

Distributed systems provide a particular challenge to program. They often require us to have multiple copies of data, which need to keep synchronized. Yet we cannot rely on processing nodes working reliably, and network delays can easily lead to inconsistencies. Despite this, many organizations rely on a range of core distributed software handling data storage, messaging, system management, and compute capability. These systems face common problems which they solve with similar solutions. This article recognizes and develops these solutions as patterns, with which we can build up an understanding of how to better understand, communicate and teach distributed system design.

04 August 2020


Photo of Unmesh Joshi

Unmesh Joshi

Unmesh Joshi is a Principal Consultant at ThoughtWorks. He is a software architecture enthusiast, who believes that understanding principles of distributed systems is as essential today as understanding web architecture or object oriented programming was in the last decade.


What this is about

For the last several months, I have been conducting workshops on distributed systems at ThoughtWorks. One of the key challenges faced while conducting the workshops was how to map theory of distributed systems to open source code bases like Kafka or Cassandra, whilst keeping the discussions generic enough to cover a broad range of solutions. The concept of patterns provided a nice way out.

Pattern structure, by its very nature, allows us to focus on a specific problem, making it very clear why a particular solution is needed. Then the solution description allows us to give a code structure, which is concrete enough to show the actual solution, but generic enough to cover a broad range of variations. Patterns technique also allows us to link various patterns together to build a complete system. This gives a nice vocabulary to discuss distributed system implementations.

What follows is a first set of patterns observed in mainstream open source distributed systems. I hope that these set of patterns will be useful to all developers.

Distributed systems - An implementation perspective

Today's enterprise architecture is full of platforms and frameworks which are distributed by nature. If we see the sample list of frameworks and platforms used in typical enterprise architecture today, it will look something like following:

Type of platform/frameworkExample
DatabasesCassandra, HBase, Riak
Message BrokersKafka, Pulsar
InfrastructureKubernetes, Mesos, Zookeeper, etcd, Consul
In Memory Data/Compute GridsHazelcast, Pivotal Gemfire
Stateful MicroservicesAkka Actors, Axon
File SystemsHDFS, Ceph

All these are 'distributed' by nature. What does it mean for a system to be distributed? There are two aspects:

  • They run on multiple servers. The number of servers in a cluster can vary from as few as three servers to a few thousand servers.
  • They manage data. So these are inherently 'stateful' systems.

There are several ways in which things can go wrong when multiple servers are involved in storing data. All the above mentioned systems need to solve those problems. The implementation of these systems have some recurring solutions to these problems. Understanding these solutions in their general form, helps in understanding the implementation of the broad spectrum of these systems and can also serve as a good guidance when new systems need to be built. Enter patterns.

Patterns

Patterns, a concept introduced by Christopher Alexander, is widely accepted in the software community to document design constructs which are used to build software systems. Patterns provide a structured way of looking at a problem space with the solutions which are seen multiple times and proven. An interesting way to use patterns is the ability to link several patterns together, in a form of pattern sequence or pattern language, which gives some guidance of implementing a ‘whole’ or a complete system. Looking at distributed systems as a series of patterns is a useful way to gain insights into their implementation.

Problems and Their Recurring Solutions.

There are several things which can go wrong when data is stored on multiple servers.

Process crashes

Processes can crash at any time. Either due to hardware faults or software faults. There are numerous ways in which a process can crash.

  • It can be taken down for routine maintenance by system administrators.
  • It can be killed doing some file IO because the disk is full and the exception is not properly handled.
  • In cloud environments, it can be even trickier, as some unrelated events can bring the servers down.

The bottom line is that if the processes are responsible for storing data, they must be designed to give a durability guarantee for the data stored on the servers. Even if a process crashes abruptly, it should preserve all the data for which it has notified the user that it's stored successfully. Depending on the access patterns, different storage engines have different storage structures, ranging from a simple hash map to a sophisticated graph storage. Because flushing data to the disk is one of the most time consuming operations, every insert or update to the storage can not be flushed to disk. So most databases have in-memory storage structures which are only periodically flushed to disk. This poses a risk of losing all the data if the process abruptly crashes.

A technique called Write-Ahead Log is used to tackle this situation. Servers store each state change as a command in an append-only file on a hard disk. Appending a file is generally a very fast operation, so it can be done without impacting performance. A single log, which is appended sequentially, is used to store each update. At the server startup, the log can be replayed to build in memory state again.

This gives a durability guarantee. The data will not get lost even if the server abruptly crashes, and then restarts. But clients will not be able to get or store any data till the server is back up. So we lack availability in the case of server failure.

One of the obvious solutions is to store the data on multiple servers. So we can replicate the write ahead log on multiple servers.

When multiple servers are involved, there are a lot more failure scenarios which need to be considered.

Network delays

In TCP/IP protocol stack, there is no upper bound on delays caused in transmitting messages across a network. It can vary based on the load on the network. For example, a 1 Gbps network link can get flooded with a big data job that's triggered, filling the network buffers, and can cause arbitrary delay for some messages to reach the servers.

In a typical data center, servers are packed together in racks, and there are multiple racks connected by a top of the rack switch. There might be a tree of switches connecting one part of the datacenter to the other. It is possible in some cases, that a set of servers can communicate with each other, but are disconnected from another set of servers. This situation is called a network partition. One of the fundamental issues with servers communicating over a network then is, when to know a particular server has failed.

There are two problems to be tackled here.

  • A particular server can not wait indefinitely to know if another server has crashed.
  • There should not be two sets of servers, each considering another set to have failed, and therefore continuing to serve different sets of clients. This is called the split brain.

To tackle the first problem, every server sends a HeartBeat message to other servers at a regular interval. If a heartbeat is missed, the server sending the heartbeat is considered crashed. The heartbeat interval is small enough to make sure that it does not take a lot of time to detect server failure. As we will see below, in the worst case scenario, the server might be up and running, but the cluster as a group can move ahead considering the server to be failing. This makes sure that services provided to clients are not interrupted.

The second problem is the split brain. With split brain, if two sets of servers accept updates independently, different clients can get and set different data, and once the split brain is resolved, it's impossible to resolve conflicts automatically.

To take care of the split brain issue, we must ensure that the two sets of servers, which are disconnected from each other, should not be able to make progress independently. To ensure this, every action the server takes, is considered successful only if the majority of the servers can confirm the action. If servers can not get majority, they will not be able to provide the required services, and some group of the clients might not be receiving the service, but servers in the cluster will always be in a consistent state. The number of servers making the majority is called a Quorum. How to decide on the quorum? That is decided based on the number of failures the cluster can tolerate. So if we have a cluster of five nodes, we need a quorum of three. In general, if we want to tolerate f failures we need a cluster size of 2f + 1.

Quorum makes sure that we have enough copies of data to survive some server failures. But it is not enough to give strong consistency guarantees to clients. Lets say a client initiates a write operation on the quorum, but the write operation succeeds only on one server. The other servers in the quorum still have old values. When a client reads the values from the quorum, it might get the latest value, if the server having the latest value is available. But it can very well get an old value if, just when the client starts reading the value, the server with the latest value is not available. To avoid such situations, someone needs to track if the quorum agrees on a particular operation and only send values to clients which are guaranteed to be available on all the servers. Leader and Followers is used in this situation. One of the servers is elected a leader and the other servers act as followers. The leader controls and coordinates the replication on the followers. The leader now needs to decide, which changes should be made visible to the clients. High-Water Mark is used to track the entry in the write ahead log that is known to have successfully replicated to a Quorum of followers. All the entries upto high-water mark are made visible to the clients. The leader also propagates the high-water mark to the followers. So in case the leader fails and one of the followers becomes the new leader, there are no inconsistencies in what a client sees.

Process Pauses

But this is not all, even with Quorums and Leader And Followers, there is a tricky problem that needs to be solved. Leader processes can pause arbitrarily. There are a lot of reasons a process can pause. For languages which support garbage collection, there can be a long garbage collection pause. A leader with a long garbage collection pause, can be disconnected from the followers, and will continue sending messages to followers after the pause is over. In the meanwhile, because followers did not receive any heartbeat from the leader, they might have elected a new leader and accepted updates from the clients. If the requests from the old leader are processed as it is, they might overwrite some of the updates. So we need a mechanism to detect requests from out of date leaders. Generation Clock is used to mark and detect requests from older leaders. The generation is a number which is monotonically increasing.

Unsynchronized Clocks and Ordering Events

The problem of detecting older leader messages from newer ones is the problem of maintaining ordering of messages. It might appear that we can use system timestamps to order a set of messages, but we can not. The main reason we can not use system clocks is that system clocks across servers are not guaranteed to be synchronized. A time of the day clock in a computer is managed by a quartz crystal and measures time based on the oscillations of the crystal.

This mechanism is error prone, as the crystals can oscillate faster or slower and so different servers can have very different times. The clocks across a set of servers are synchronized by a service called NTP. This service periodically checks a set of global time servers, and adjusts the computer clock accordingly.

Because this happens with communication over a network, and network delays can vary as discussed in the above sections, the clock synchronization might be delayed because of a network issue. This can cause server clocks to drift away from each other, and after the NTP sync happens, even move back in time. Because of these issues with computer clocks, time of day is generally not used for ordering events. Instead a simple technique called Lamport’s timestamp is used. Generation Clock is an example of that.

Putting it all together - An example distributed system

We can see how understanding these patterns, helps us build a complete system, from the ground up. We will take consensus implementation as an example. Distributed Consensus is a special case of distributed system implementation, which provides the strongest consistency guarantee. Common examples seen in popular enterprise systems are, Zookeeper, etcd and Consul. They implement consensus algorithms like zab and Raft to provide replication and strong consistency. There are other popular algorithms to implement consensus, Paxos which is used in Google's Chubby locking service, view stamp replication and virtual-synchrony. In very simple terms, Consensus refers to a set of servers which agree on stored data, the order in which the data is stored and when to make that data visible to the clients.

Pattern Sequence for implementing consensus

Consensus implementations use state machine replication to achieve fault tolerance. In state machine replication, the storage services, like a key value store, are replicated on all the servers, and the user inputs are executed in the same order on each server. The key implementation technique used to achieve this is to replicate Write-Ahead Log on all the servers to have a 'Replicated Wal'.

We can put the patterns together to implement Replicated Wal as follows.

For providing durability guarantees, use Write-Ahead Log. Write Ahead Log is divided into multiple segments using Segmented Log. This helps with log cleaning which is handled by Low-Water Mark. Fault tolerance is provided by replicating the write ahead log on multiple servers. Replication amongst the servers is managed by using Leader and Followers. Quorum is used to update High-Water Mark to decide which values are visible to clients. All the requests are processed in strict order, by using Singular Update Queue. The order is maintained while sending the requests from leaders to followers using Single Socket Channel. To optimize for throughput and latency over a single socket channel, Request Pipeline is used. Followers know about availability of leader by HeartBeat received from the leader. If leader is temporarily disconnected from the cluster because of network partition, it is detected by using Generation Clock.

This way, understanding problems and their recurring solutions in their general form, helps in understanding building blocks of a complete system

Next Steps

Distributed Systems is a vast topic. The set of patterns covered here is a small part, covering different categories to showcase how a patterns approach can help understand and design distributed systems. I will keep adding to this set to broadly include the following categories of problems solved in any distributed system

  • Group Membership and Failure Detection
  • Partitioning
  • Replication and Consistency
  • Storage
  • Processing

Acknowledgements

Many thanks to Martin Fowler for helping me throughout and guiding me to think in terms of patterns.

Mushtaq Ahemad helped me with good feedback and a lot of discussions throughout

Rebecca Parsons, Dave Elliman, Samir Seth, Prasanna Pendse, Santosh Mahale, Sarthak Makhija, James Lewis, Kumar Sankara Iyer, Evan Bottcher, Jojo Swords, Gareth Morgan provided feedback on the earlier drafts

Significant Revisions

04 August 2020: Initial publication with Generation Clock and Heartbeat patterns