Gossip Dissemination

Use random selection of nodes to pass on information to ensure it reaches all the nodes in the cluster without flooding the network

17 June 2021

Unmesh Joshi

Problem

In a cluster of nodes, each node needs to pass metadata information it has, to all the other nodes in the cluster, without depending on a shared storage. In a large cluster, if all servers communicate with all the other servers, a lot of network bandwidth can be consumed. Information should reach all the nodes even when some network links are experiencing issues.

Solution

Cluster nodes use gossip style communication to propagate state updates. Each node selects a random node to pass the information it has. This is done at a regular interval, say every 1 second. Each time, a random node is selected to pass on the information.

In large clusters, the following things need to be considered:

  • Put a fixed limit on the number of messages generated per server
  • The messages should not consume a lot of network bandwidth. There should be an upper bound of say a few hundred Kbs, making sure that the applications' data transfer is not impacted by too many messages across the cluster.
  • The metadata propagation should tolerate network and a few server failures. It should reach all the cluster nodes even if a few network links are down, or a few servers have failed.

As discussed in the sidebar, Gossip-style communication fulfills all these requirements.

Each cluster nodes stores the metadata as a list of key value pairs associated with each node in the cluster as following:

class Gossip…

  Map<NodeId, NodeState> clusterMetadata = new HashMap<>();

class NodeState…

  Map<String, VersionedValue> values = new HashMap<>();

At startup, each cluster node adds the metadata about itself, which needs to be propagated to other nodes. An example of metadata can be the IP address and port the node listens on, the partitions it's responsible for, etc. The Gossip instance needs to know about at least one other node to start the gossip communication. The well known cluster node, which is used to initialize the Gossip instance is called as a seed node or an introducer. Any node can act as an introducer.

class Gossip…

  public Gossip(InetAddressAndPort listenAddress,
                List<InetAddressAndPort> seedNodes,
                String nodeId) throws IOException {
      this.listenAddress = listenAddress;
      //filter this node itself in case its part of the seed nodes
      this.seedNodes = removeSelfAddress(seedNodes);
      this.nodeId = new NodeId(nodeId);
      addLocalState(GossipKeys.ADDRESS, listenAddress.toString());

      this.socketServer = new NIOSocketListener(newGossipRequestConsumer(), listenAddress);
  }

  private void addLocalState(String key, String value) {
      NodeState nodeState = clusterMetadata.get(listenAddress);
      if (nodeState == null) {
          nodeState = new NodeState();
          clusterMetadata.put(nodeId, nodeState);
      }
      nodeState.add(key, new VersionedValue(value, incremenetVersion()));
  }

Each cluster node schedules a job to transmit the metadata it has to other nodes at regular intervals.

class Gossip…

  private ScheduledThreadPoolExecutor gossipExecutor = new ScheduledThreadPoolExecutor(1);
  private long gossipIntervalMs = 1000;
  private ScheduledFuture<?> taskFuture;
  public void start() {
      socketServer.start();
      taskFuture = gossipExecutor.scheduleAtFixedRate(()-> doGossip(),
                  gossipIntervalMs,
                  gossipIntervalMs,
                  TimeUnit.MILLISECONDS);
  }

When the scheduled task is invoked, it picks up a small set of random nodes from the list of servers from the metadata map. A small constant number, defined as Gossip fanout, determines how many nodes to pick up as gossip targets. If nothing is known yet, it picks up a random seed node and sends the metadata map it has to that node.

class Gossip…

  public void doGossip() {
      List<InetAddressAndPort> knownClusterNodes = liveNodes();
      if (knownClusterNodes.isEmpty()) {
          sendGossip(seedNodes, gossipFanout);
      } else {
          sendGossip(knownClusterNodes, gossipFanout);
      }
  }

  private List<InetAddressAndPort> liveNodes() {
      Set<InetAddressAndPort> nodes
              = clusterMetadata.values()
              .stream()
              .map(n -> InetAddressAndPort.parse(n.get(GossipKeys.ADDRESS).getValue()))
              .collect(Collectors.toSet());
      return removeSelfAddress(nodes);
  }
private void sendGossip(List<InetAddressAndPort> knownClusterNodes, int gossipFanout) {
    if (knownClusterNodes.isEmpty()) {
        return;
    }

    for (int i = 0; i < gossipFanout; i++) {
        InetAddressAndPort nodeAddress = pickRandomNode(knownClusterNodes);
        sendGossipTo(nodeAddress);
    }
}

private void sendGossipTo(InetAddressAndPort nodeAddress) {
    try {
        getLogger().info("Sending gossip state to " + nodeAddress);
        SocketClient<RequestOrResponse> socketClient = new SocketClient(nodeAddress);
        GossipStateMessage gossipStateMessage
                = new GossipStateMessage(this.clusterMetadata);
        RequestOrResponse request
                = createGossipStateRequest(gossipStateMessage);
        byte[] responseBytes = socketClient.blockingSend(request);
        GossipStateMessage responseState = deserialize(responseBytes);
        merge(responseState.getNodeStates());

    } catch (IOException e) {
        getLogger().error("IO error while sending gossip state to " + nodeAddress, e);
    }
}

private RequestOrResponse createGossipStateRequest(GossipStateMessage gossipStateMessage) {
    return new RequestOrResponse(RequestId.PushPullGossipState.getId(),
            JsonSerDes.serialize(gossipStateMessage), correlationId++);
}

The cluster node receiving the gossip message inspects the metadata it has and finds three things.

  • The values which are in the incoming message but not available in this node's state map
  • The values which it has but the incoming Gossip message does not have
  • The higher version value is chosen when the node has the values present in the incoming message

It then adds the missing values to its own state map. Whatever values were missing from the incoming message, are returned as a response.

The cluster node sending the Gossip message adds the values it gets from the gossip response to its own state.

class Gossip…

  private void handleGossipRequest(org.distrib.patterns.common.Message<RequestOrResponse> request) {
      GossipStateMessage gossipStateMessage = deserialize(request.getRequest());
      Map<NodeId, NodeState> gossipedState = gossipStateMessage.getNodeStates();
      getLogger().info("Merging state from " + request.getClientSocket());
      merge(gossipedState);

      Map<NodeId, NodeState> diff = delta(this.clusterMetadata, gossipedState);
      GossipStateMessage diffResponse = new GossipStateMessage(diff);
      getLogger().info("Sending diff response " + diff);
      request.getClientSocket().write(new RequestOrResponse(RequestId.PushPullGossipState.getId(),
                      JsonSerDes.serialize(diffResponse),
                      request.getRequest().getCorrelationId()));
  }
public Map<NodeId, NodeState> delta(Map<NodeId, NodeState> fromMap, Map<NodeId, NodeState> toMap) {
    Map<NodeId, NodeState> delta = new HashMap<>();
    for (NodeId key : fromMap.keySet()) {
        if (!toMap.containsKey(key)) {
            delta.put(key, fromMap.get(key));
            continue;
        }
        NodeState fromStates = fromMap.get(key);
        NodeState toStates = toMap.get(key);
        NodeState diffStates = fromStates.diff(toStates);
        if (!diffStates.isEmpty()) {
            delta.put(key, diffStates);
        }
    }
    return delta;
}
public void merge(Map<NodeId, NodeState> otherState) {
    Map<NodeId, NodeState> diff = delta(otherState, this.clusterMetadata);
    for (NodeId diffKey : diff.keySet()) {
        if(!this.clusterMetadata.containsKey(diffKey)) {
            this.clusterMetadata.put(diffKey, diff.get(diffKey));
        } else {
            NodeState stateMap = this.clusterMetadata.get(diffKey);
            stateMap.putAll(diff.get(diffKey));
        }
    }
}

This process happens every one second at each cluster node, each time selecting a different node to exchange the state.

Avoiding unnecessary state exchange

The above code example shows that the complete state of the node is sent in the Gossip message. This is fine for a newly joined node, but once the state is up to date, it's unnecessary to send the complete state. The cluster node just needs to send the state changes since the last gossip. For achieving this, each node maintains a version number which is incremented every time a new metadata entry is added locally.

class Gossip…

  private int gossipStateVersion = 1;


  private int incremenetVersion() {
      return gossipStateVersion++;
  }

Each value in the cluster metadata is maintained with a version number. This is an example of pattern Versioned Value.

class VersionedValue…

  int version;
  String value;

  public VersionedValue(String value, int version) {
      this.version = version;
      this.value = value;
  }

  public int getVersion() {
      return version;
  }

  public String getValue() {
      return value;
  }

Each Gossip cycle can then exchange states from a specific version.

class Gossip…

  private void sendKnownVersions(InetAddressAndPort gossipTo) throws IOException {
      Map<NodeId, Integer> maxKnownNodeVersions = getMaxKnownNodeVersions();
      RequestOrResponse knownVersionRequest = new RequestOrResponse(RequestId.GossipVersions.getId(),
              JsonSerDes.serialize(new GossipStateVersions(maxKnownNodeVersions)), 0);
      SocketClient<RequestOrResponse> socketClient = new SocketClient(gossipTo);
      byte[] knownVersionResponseBytes = socketClient.blockingSend(knownVersionRequest);
  }

  private Map<NodeId, Integer> getMaxKnownNodeVersions() {
      return clusterMetadata.entrySet()
              .stream()
              .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().maxVersion()));
  }

class NodeState…

  public int maxVersion() {
      return values.values().stream().map(v -> v.getVersion()).max(Comparator.naturalOrder()).orElse(0);
  }

The receiving node can then send the values only if the versions are greater than the ones in the request.

class Gossip…

  Map<NodeId, NodeState> getMissingAndNodeStatesHigherThan(Map<NodeId, Integer> nodeMaxVersions) {
      Map<NodeId, NodeState> delta = new HashMap<>();
      delta.putAll(higherVersionedNodeStates(nodeMaxVersions));
      delta.putAll(missingNodeStates(nodeMaxVersions));
      return delta;
  }

  private Map<NodeId, NodeState> missingNodeStates(Map<NodeId, Integer> nodeMaxVersions) {
      Map<NodeId, NodeState> delta = new HashMap<>();
      List<NodeId> missingKeys = clusterMetadata.keySet().stream().filter(key -> !nodeMaxVersions.containsKey(key)).collect(Collectors.toList());
      for (NodeId missingKey : missingKeys) {
          delta.put(missingKey, clusterMetadata.get(missingKey));
      }
      return delta;
  }

  private Map<NodeId, NodeState> higherVersionedNodeStates(Map<NodeId, Integer> nodeMaxVersions) {
      Map<NodeId, NodeState> delta = new HashMap<>();
      Set<NodeId> keySet = nodeMaxVersions.keySet();
      for (NodeId node : keySet) {
          Integer maxVersion = nodeMaxVersions.get(node);
          NodeState nodeState = clusterMetadata.get(node);
          if (nodeState == null) {
              continue;
          }
          NodeState deltaState = nodeState.statesGreaterThan(maxVersion);
          if (!deltaState.isEmpty()) {
              delta.put(node, deltaState);
          }
      }
      return delta;
  }

Gossip implementation in [cassandra] optimizes state exchange with a three-way handshake, where the node receiving the gossip message also sends the versions it needs from the sender, along with the metadata it returns. The sender can then immediately respond with the requested metadata. This avoids an extra message that otherwise would have been required.

Gossip protocol used in [cockroachdb] maintains state for each connected node. For each connection, it maintains the last version sent to that node, and the version received from that node. This is so that it can send 'state since the last sent version' and ask for 'state from the last received version'.

Some other efficient alternatives can be used as well, sending a hash of the entire Map and if the hash is the same, then doing nothing.

Criteria for node selection to Gossip

Cluster nodes randomly select the node to send the Gossip message. An example implementation in Java can use java.util.Random as following:

class Gossip…

  private Random random = new Random();
  private InetAddressAndPort pickRandomNode(List<InetAddressAndPort> knownClusterNodes) {
      int randomNodeIndex = random.nextInt(knownClusterNodes.size());
      InetAddressAndPort gossipTo = knownClusterNodes.get(randomNodeIndex);
      return gossipTo;
  }

There can be other considerations such as the node that is least contacted with. For example, Gossip protocol in Cockroachdb selects nodes this way.

There are network-topology-aware ways of Gossip target selection that exist as well.

Any of these can be implemented modularly inside the pickRandomNode() method.

Group Membership and Failure Detection

Maintaining the list of available nodes in the cluster is one of the most common usage of Gossip protocols. There are two approaches in use.

  • [swim-gossip] uses a separate probing component which continuously probes different nodes in the cluster to detect if they are available. If it detects that the node is alive or dead, that result is propagated to the entire cluster with Gossip communication. The prober randomly selects a node to send the Gossip message. If the receiving node detects that this is new information, it immediately sends the message to a randomly selected node. This way, the failure of a node or newly joined node in the cluster is quickly known to the entire cluster.
  • The cluster node can periodically update its own state to reflect its heartbeat. This state is then propagated to the entire cluster through the gossip messages exchanged. Each cluster node can then check if it has received any update for a particular cluster node in a fixed amount of time or else mark that node as down. In this case, each cluster node independently determines if a node is up or down.

Handling node restarts

The versioned values does not work well if the node crashes or restarts, as all the in-memory state is lost. More importantly, the node can have different values for the same key. For example, the cluster node can start with a different IP address and port, or can start with a different configuration. Generation Clock can be used to mark generation with every value, so that when the metadata state is sent to a random cluster node, the receiving node can detect changes not just by the version number, but also with the generation.

It is useful to note that this mechanism is not necessary for the core Gossip protocol to work. But it's implemented in practice to make sure that the state changes are tracked correctly.

Examples

[cassandra] uses Gossip protocol for the group membership and failure detection of cluster nodes. Metadata for each cluster node such as the tokens assigned to each cluster node, is also transmitted using Gossip protocol.

[consul] uses [swim-gossip] protocol for group membership and failure detection of consul agents.

[cockroachdb] uses Gossip protocol to propagate node metadata.

Blockchain implementations such as Hyperledger Fabric use Gossip protocol for group membership and sending ledger metadata.

Significant Revisions