This pattern is part of Patterns of Distributed Systems

Low-Water Mark

An index in the write ahead log showing which portion of the log can be discarded.

18 August 2020

Problem

The write ahead log maintains every update to persistent store. It can grow indefinitely over time. Segmented Log allows dealing with smaller files at a time, but total disk storage can grow indefinitely if not checked.

Solution

Have a mechanism to tell logging machinery which portion of the log can be safely discarded. The mechanism gives the lowest offset or low water mark, before which point the logs can be discarded. Have a task running in the background, in a separate thread, which continuously checks which portion of the log can be discarded and deletes the files on the disk.

this.logCleaner = newLogCleaner(config);
this.logCleaner.startup();

The Log cleaner can be implemented as a scheduled task

public void startup() {
    scheduleLogCleaning();
}

private void scheduleLogCleaning() {
    singleThreadedExecutor.schedule(() -> {
        cleanLogs();
    }, config.getCleanTaskIntervalMs(), TimeUnit.MILLISECONDS);
}

Snapshot based Low-Water Mark

Most consensus implementations like Zookeeper, or etcd (as defined in RAFT), implement snapshot mechanisms. In this implementation, the storage engine takes periodic snapshots. Along with snapshot, it also stores the log index which is successfully applied. Referring to the simple key value store implementation in the Write-Ahead Log pattern, the snapshot can be taken as following:

public SnapShot takeSnapshot() {
    Long snapShotTakenAtLogIndex = wal.getLastLogIndex();
    return new SnapShot(serializeState(kv), snapShotTakenAtLogIndex);
}

Once a snapshot is successfully persisted on the disk, the log manager is given the low water mark to discard the older logs.

List<WALSegment> getSegmentsBefore(Long snapshotIndex) {
    List<WALSegment> markedForDeletion = new ArrayList<>();
    List<WALSegment> sortedSavedSegments = wal.sortedSavedSegments;
    for (WALSegment sortedSavedSegment : sortedSavedSegments) {
        if (sortedSavedSegment.getLastLogEntryIndex() < snapshotIndex) {
            markedForDeletion.add(sortedSavedSegment);
        }
    }
    return markedForDeletion;
}

Time based Low-Water Mark

In some systems, where log is not necessarily used to update the state of the system, log can be discarded after a given time window, without waiting for any other subsystem to share the lowest log index which can be removed. For example, in systems like Kafka, logs are maintained for 7 weeks; all the log segments which have messages older than 7 weeks are discarded. For this implementation, each log entry also includes the timestamp when it was created. The log cleaner can then check the last entry of each log segment, and discard segments which are older than the configured time window.

private List<WALSegment> getSegmentsPast(Long logMaxDurationMs) {
    long now = System.currentTimeMillis();
    List<WALSegment> markedForDeletion = new ArrayList<>();
    List<WALSegment> sortedSavedSegments = wal.sortedSavedSegments;
    for (WALSegment sortedSavedSegment : sortedSavedSegments) {
        if (timeElaspedSince(now, sortedSavedSegment.getLastLogEntryTimestamp()) > logMaxDurationMs) {
            markedForDeletion.add(sortedSavedSegment);
        }
    }
    return markedForDeletion;
}

private long timeElaspedSince(long now, long lastLogEntryTimestamp) {
    return now - lastLogEntryTimestamp;
}

Examples

  • The log implementation in all Consensus algorithms like Zookeeper and RAFT implement snapshot based log cleaning
  • The storage implementation in Kafka follows time based log cleaning
Significant Revisions

18 August 2020: Published