State Watch

Notify clients when specific values change on the server

19 January 2021

Unmesh Joshi

Problem

Clients are interested in changes to the specific values on the server. It's difficult for clients to structure their logic if they need to poll the server continuously to look for changes. If clients open too many connections to the server for watching changes, it can overwhelm the server.

Solution

Allow clients to register their interest with the server for specific state changes. The server notifies the interested clients when state changes happen. The client maintains a Single Socket Channel with the server. The server sends state change notifications on this channel. Clients might be interested in multiple values, but maintaining a connection per watch can overwhelm the server. So clients can use Request Pipeline.

Considering a simple key value store example used in Consistent Core: a client can be interested when a value changes for a particular key or a key is removed. There are two parts to the implementation, a client side implementation and a server side implementation.

Client side implementation

The client accepts the key and the function to be invoked when it gets watch events from the server. The client stores the function object for later invocation. It then sends the request to register the watch to the server.

ConcurrentHashMap<String, Consumer<WatchEvent>> watches = new ConcurrentHashMap<>();

public void watch(String key, Consumer<WatchEvent> consumer) {
    watches.put(key, consumer);
    sendWatchRequest(key);
}

private void sendWatchRequest(String key) {
    requestSendingQueue.submit(new RequestOrResponse(RequestId.WatchRequest.getId(),
            JsonSerDes.serialize(new WatchRequest(key)),
            correlationId.getAndIncrement()));
}

When a watch event is received on the connection, a corresponding consumer is invoked

this.pipelinedConnection = new PipelinedConnection(address, requestTimeoutMs, (r) -> {
    logger.info("Received response on the pipelined connection " + r);
    if (r.getRequestId() == RequestId.WatchRequest.getId()) {
        WatchEvent watchEvent = JsonSerDes.deserialize(r.getMessageBodyJson(), WatchEvent.class);
        Consumer<WatchEvent> watchEventConsumer = getConsumer(watchEvent.getKey());
        watchEventConsumer.accept(watchEvent);
        lastWatchedEventIndex = watchEvent.getIndex(); //capture last watched index, in case of connection failure.
    }
    completeRequestFutures(r);
});

Server side implementation

When the server receives a watch registration request, it keeps the mapping of the pipelined connection on which the request is received, and the keys.

private Map<String, ClientConnection> watches = new HashMap<>();
private Map<ClientConnection, List<String>> connection2WatchKeys = new HashMap<>();

public void watch(String key, ClientConnection clientConnection) {
    logger.info("Setting watch for " + key);
    addWatch(key, clientConnection);
}

private synchronized void addWatch(String key, ClientConnection clientConnection) {
    mapWatchKey2Connection(key, clientConnection);
    watches.put(key, clientConnection);
}

private void mapWatchKey2Connection(String key, ClientConnection clientConnection) {
    List<String> keys = connection2WatchKeys.get(clientConnection);
    if (keys == null) {
        keys = new ArrayList<>();
        connection2WatchKeys.put(clientConnection, keys);
    }
    keys.add(key);
}

The ClientConnection wraps the socket connection to the client. It has the following structure. This structure remains the same for both, the blocking-IO based server and Non-blocking-IO-based server.

public interface ClientConnection {
    void write(RequestOrResponse response);
    void close();
}

There can be multiple watches registered on a single connection. So it is important to store the mapping of connections to the list of watch keys. It is needed when the client connection is closed, to remove all the associated watches as following:

    public void close(ClientConnection connection) {
        removeWatches(connection);
    }

    private synchronized void removeWatches(ClientConnection clientConnection) {
        List<String> watchedKeys = connection2WatchKeys.remove(clientConnection);
        if (watchedKeys == null) {
            return;
        }
        for (String key : watchedKeys) {
            watches.remove(key);
        }
    }

When the specific events like setting a value for key happen on the server, the server notifies all the registered clients by constructing a relevant WatchEvent

private synchronized void notifyWatchers(SetValueCommand setValueCommand, Long entryId) {
    if (!hasWatchesFor(setValueCommand.getKey())) {
        return;
    }
    String watchedKey = setValueCommand.getKey();
    WatchEvent watchEvent = new WatchEvent(watchedKey,
                                setValueCommand.getValue(),
                                EventType.KEY_ADDED, entryId);
    notify(watchEvent, watchedKey);
}

private void notify(WatchEvent watchEvent, String watchedKey) {
    List<ClientConnection> watches = getAllWatchersFor(watchedKey);
    for (ClientConnection pipelinedClientConnection : watches) {
        try {
            String serializedEvent = JsonSerDes.serialize(watchEvent);
            getLogger().trace("Notifying watcher of event "
                    + watchEvent +
                    " from "
                    + server.getServerId());
            pipelinedClientConnection
                    .write(new RequestOrResponse(RequestId.WatchRequest.getId(),
                            serializedEvent));
        } catch (NetworkException e) {
            removeWatches(pipelinedClientConnection); //remove watch if network connection fails.
        }
    }
}

One of the critical things to note is that the state related to watches can be accessed concurrently from client request handling code and from the client connection handling code to close the connection. So all the methods accessing watch state needs to be protected by locks.

Watches on hierarchical storage

Consistent Core mostly supports hierarchical storage. The watches can be set on the parent nodes or prefix of a key. Any changes to the child node triggers the watches set on the parent node. For each event, the Consistent Core walks the path to check if there are watches setup on the parent path and send events to all those watches.

List<ClientConnection> getAllWatchersFor(String key) {
    List<ClientConnection> affectedWatches = new ArrayList<>();
    String[] paths = key.split("/");
    String currentPath = paths[0];
    addWatch(currentPath, affectedWatches);
    for (int i = 1; i < paths.length; i++) {
        currentPath = currentPath + "/" + paths[i];
        addWatch(currentPath, affectedWatches);
    }
    return affectedWatches;
}

private void addWatch(String currentPath, List<ClientConnection> affectedWatches) {
    ClientConnection clientConnection = watches.get(currentPath);
    if (clientConnection != null) {
        affectedWatches.add(clientConnection);
    }
}

This allows a watch to be set up on a key prefix like "servers". Any key created with this prefix like "servers/1", "servers/2" will trigger this watch.

Because the mapping of the function to be invoked is stored with the key prefix, it's important to walk the hierarchy to find the function to be invoked for the received event on the client side as well. An alternative can be to send the path for which the event triggered along with the event, so that the client knows which watch caused the event to be sent.

Handling Connection Failures

The connection between client and server can fail at any time. For some use cases this is problematic as the client might miss certain events when it's disconnected. For example, a cluster controller might be interested in knowing if some nodes have failed, which is indicated by events for removal of some keys. The client needs to tell the server about the last event it received. The client sends the last received event number when it resets the watch again. The server is expected to send all the events it has recorded from that event number onwards.

In the Consistent Core client, it can be done when the client re-establishes the connection to the leader.

private void connectToLeader(List<InetAddressAndPort> servers) {
    while (isDisconnected()) {
        logger.info("Trying to connect to next server");
        waitForPossibleLeaderElection();
        establishConnectionToLeader(servers);
    }
    setWatchesOnNewLeader();
}

private void setWatchesOnNewLeader() {
    for (String watchKey : watches.keySet()) {
        sendWatchResetRequest(watchKey);
    }
}

private void sendWatchResetRequest(String key) {
    pipelinedConnection.send(new RequestOrResponse(RequestId.SetWatchRequest.getId(),
            JsonSerDes.serialize(new SetWatchRequest(key, lastWatchedEventIndex)), correlationId.getAndIncrement()));
}

The server numbers every event that occurs. For example, if the server is the Consistent Core, it stores all the state changes in a strict order and every change is numbered with the log index as discussed in Write-Ahead Log, It is then possible for clients to ask for events starting from the specific index.

Deriving events from the key value store

The events can be generated looking at the current state of the key value store, if it also numbers every change that happens and stores that number with each value.

When the client re-establishes the connection to the server, it can set the watches again also sending the last seen change number. The server can then compare it with the one stored with the value and if it's more than the one client sent, it can resend the events to the client. Deriving events from the key value store can be a bit awkward as events need to be guessed. It might miss some events. - for instance, If a key is created and then deleted - while the client was disconnected, the create event will be missed.

private synchronized void eventsFromStoreState(String key, long stateChangesSince) {
    List<StoredValue> values = getValuesForKeyPrefix(key);
    for (StoredValue value : values) {
        if (values == null) {
            //the key was probably deleted send deleted event
            notify(new WatchEvent(key, EventType.KEY_DELETED), key);
        } else if (value.index > stateChangesSince) {
            //the key/value was created/updated after the last event client knows about
            notify(new WatchEvent(key, value.getValue(), EventType.KEY_ADDED, value.getIndex()), key);
        }
    }
}

[zookeeper] uses this approach. The watches in zookeeper are also one-time triggers by default. Once the event is triggered, clients need to set the watch again if they want to receive further events. Some events can be missed, before the watch is set again, so clients need to ensure they read the latest state, so that they don't miss any updates.

Storing Event History

It's easier to keep a history of past events and reply to clients from the event history. The problem with this approach is that the event history needs to be limited, say to 1,000 events. If the client is disconnected for a longer duration, it might miss on events which are beyond the 1,000 events window.

A simple implementation using google guava's EvictingQueue is as following:

public class EventHistory implements Logging {
    Queue<WatchEvent> events = EvictingQueue.create(1000);
    public void addEvent(WatchEvent e) {
        getLogger().info("Adding " + e);
        events.add(e);
    }

    public List<WatchEvent> getEvents(String key, Long stateChangesSince) {
        return this.events.stream()
                .filter(e -> e.getIndex() > stateChangesSince && e.getKey().equals(key))
                .collect(Collectors.toList());
    }
}

When the client re-establishes the connection and resets watches, the events can be sent from history.

private void sendEventsFromHistory(String key, long stateChangesSince) {
    List<WatchEvent> events = eventHistory.getEvents(key, stateChangesSince);
    for (WatchEvent event : events) {
        notify(event, event.getKey());
    }
}

Using multi-version storage

To keep track of all the changes, it is possible to use multi-version storage. It keeps track of all the versions for every key, and can easily get all the changes from the version asked for.

[etcd] version 3 onwards uses this approach

Examples

[zookeeper] has the ability to set up watches on nodes. This is used by products like [kafka] for group membership and failure detection of cluster members.

[etcd] has watch implementation which is heavily used by [kubernetes] for its resource watch implementation.

Significant Revisions