Skip to content

SWIM Topology Tracking

How NoriKV uses SWIM for failure detection and maintains a consistent cluster topology view.



Overview

NoriKV uses SWIM (Scalable Weakly-consistent Infection-style Process Group Membership Protocol) for distributed failure detection and cluster membership management. The topology watcher integrates SWIM events with the ClusterView system to provide clients with up-to-date routing information.

Key Features: - Gossip-based failure detection - O(log N) message complexity - Scalable - Works efficiently in clusters of 100+ nodes - Eventually consistent - Membership view converges across cluster - Integration with Raft - Automatic reconfiguration on membership changes - Client routing - Live topology updates via Meta.WatchCluster


Architecture

Component Interaction

┌────────────────────────────────────────────┐
│  SWIM Membership                           │
│  - Gossip protocol                         │
│  - Failure detection                       │
│  - Incarnation numbers                     │
└────────────┬───────────────────────────────┘
             │ MembershipEvent stream
┌────────────────────────────────────────────┐
│  Topology Watcher (background task)        │
│  - Subscribes to SWIM events               │
│  - Updates ClusterView                     │
│  - Triggers refresh on changes             │
└────────────┬───────────────────────────────┘
┌────────────────────────────────────────────┐
│  ClusterViewManager                        │
│  - Epoch-versioned cluster state           │
│  - Node list with roles                    │
│  - Shard assignments                       │
└────────────┬───────────────────────────────┘
             │ Broadcast channel
┌────────────────────────────────────────────┐
│  Meta.WatchCluster (gRPC streaming)        │
│  - Streams updates to clients              │
│  - Clients update routing tables           │
└────────────────────────────────────────────┘

Data Flow

Node join:
  SWIM detects new member
    ↓ MembershipEvent::MemberJoined
  Topology Watcher receives event
  ClusterView.add_node(id, addr)
    ↓ epoch++
  Notify subscribers
  Clients receive update
  Routing table updated

Node failure:
  SWIM suspects member (missed pings)
    ↓ MembershipEvent::MemberSuspect
  Topology Watcher receives event
  ClusterView.update_node_role(id, "suspect")
    ↓ epoch++
  Clients route away from suspect node
    ↓ MembershipEvent::MemberFailed (timeout)
  ClusterView.update_node_role(id, "failed")
  Raft reconfiguration triggered
  New leaders elected for affected shards

SWIM Protocol

How SWIM Works

SWIM uses a gossip-based approach for scalable failure detection:

1. Ping Protocol

Each node periodically (every 1 second): 1. Selects a random member to ping 2. Sends PING message 3. Waits for ACK (timeout: 500ms)

2. Indirect Ping (Suspicion)

If no ACK received: 1. Mark member as "suspect" 2. Select k random members (k=3) 3. Ask them to ping the suspect 4. If any ACK received → member alive 5. If no ACK from anyone → member failed

3. Incarnation Numbers

Each node has an incarnation number that increases on: - Refuting a suspicion (node is alive) - Recovering from a crash

Higher incarnation number wins in conflict resolution.

4. Gossip Piggyback

Membership updates piggyback on PING/ACK messages: - MemberJoined events - MemberLeft events - MemberFailed events - MemberAlive refutations

Why SWIM?

vs. Heartbeat-based (all-to-all pings): - SWIM: O(log N) messages per period - Heartbeat: O(N²) messages per period - SWIM scales to 100+ nodes efficiently

vs. Consul (uses SWIM): - NoriKV uses nori-swim, a minimal SWIM implementation - No external dependencies - Integrated directly with Raft

vs. Raft membership: - Raft: Strong consistency, requires quorum - SWIM: Eventually consistent, faster detection - Used together: SWIM detects, Raft reconfigures


Topology Watcher Implementation

Code Structure

Location: apps/norikv-server/src/cluster_view.rs

impl ClusterViewManager {
    /// Start topology watcher task.
    pub fn start_topology_watcher(
        self: Arc<Self>,
        swim: Arc<nori_swim::SwimMembership>,
    ) -> tokio::task::JoinHandle<()> {
        tokio::spawn(async move {
            let mut events = swim.events();
            tracing::info!("Topology watcher started, listening for SWIM events");

            loop {
                match events.recv().await {
                    Ok(event) => {
                        if let Err(e) = self.handle_membership_event(event).await {
                            tracing::error!("Failed to handle membership event: {:?}", e);
                        }
                    }
                    Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
                        tracing::warn!("Topology watcher lagged, skipped {} events", skipped);
                    }
                    Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                        tracing::info!("SWIM event channel closed, topology watcher exiting");
                        break;
                    }
                }
            }
        })
    }
}

Event Handling

Five Event Types:

pub enum MembershipEvent {
    MemberJoined { id: String, addr: SocketAddr },
    MemberSuspect { id: String, incarnation: u64 },
    MemberFailed { id: String },
    MemberLeft { id: String },
    MemberAlive { id: String, incarnation: u64 },
}

Handler Logic:

async fn handle_membership_event(
    &self,
    event: nori_swim::MembershipEvent,
) -> Result<(), ClusterViewError> {
    use nori_swim::MembershipEvent;

    match event {
        MembershipEvent::MemberJoined { id, addr } => {
            tracing::info!("Member joined: {} at {}", id, addr);
            self.add_node(id, addr.to_string()).await?;
        }
        MembershipEvent::MemberSuspect { id, incarnation } => {
            tracing::warn!("Member suspected: {} (incarnation {})", id, incarnation);
            self.update_node_role(&id, "suspect").await?;
        }
        MembershipEvent::MemberFailed { id } => {
            tracing::warn!("Member failed: {}", id);
            self.update_node_role(&id, "failed").await?;
        }
        MembershipEvent::MemberLeft { id } => {
            tracing::info!("Member left: {}", id);
            self.remove_node(&id).await?;
        }
        MembershipEvent::MemberAlive { id, incarnation } => {
            tracing::info!("Member alive: {} (incarnation {})", id, incarnation);
            self.update_node_role(&id, "follower").await?;
        }
    }

    // Trigger a refresh to update shard assignments
    self.refresh().await?;

    Ok(())
}

Epoch Versioning

ClusterView uses monotonic epoch numbers for conflict-free updates:

pub struct ClusterView {
    /// Monotonically increasing version number
    pub epoch: u64,

    /// All nodes in the cluster
    pub nodes: Vec<ClusterNode>,

    /// Shard assignments and leadership
    pub shards: Vec<ShardInfo>,
}

Update Pattern:

async fn update_node_role(&self, node_id: &str, role: &str) -> Result<(), ClusterViewError> {
    let mut view = self.view.write();

    // Check if update is needed
    let needs_update = view.nodes.iter()
        .find(|n| n.id == node_id)
        .map(|n| n.role != role)
        .unwrap_or(false);

    if needs_update {
        // Increment epoch first
        view.epoch += 1;
        let new_epoch = view.epoch;

        // Then update the role
        if let Some(node) = view.nodes.iter_mut().find(|n| n.id == node_id) {
            node.role = role.to_string();
            tracing::info!("Updated node {} role to {} (epoch {})", node_id, role, new_epoch);
        }
    }

    Ok(())
}

Why epochs? - Clients can detect stale views (older epoch) - No need for vector clocks or version vectors - Simple monotonic counter - Works across network partitions (highest epoch wins)


Design Decisions

1. Separate Watcher Task

Decision: Run topology watcher as a separate tokio task.

Rationale: - Non-blocking: SWIM events don't block main server - Isolation: Failures in watcher don't crash server - Clean shutdown: Can abort task independently - Testable: Can mock SWIM events

Alternative: Handle events in Node::start() loop - Blocks server startup - Harder to test - Mixed concerns


2. Broadcast Channel for Updates

Decision: Use tokio::sync::broadcast for ClusterView updates.

Rationale: - Multiple subscribers: Meta.WatchCluster, internal monitoring - No missed updates: New subscribers get current view - Backpressure: Lagging subscribers skip old events - Efficient: Lock-free, clone-on-read

Code:

pub struct ClusterViewManager {
    /// Current cluster view
    view: Arc<RwLock<ClusterView>>,

    /// Broadcast channel for cluster view updates
    update_tx: broadcast::Sender<ClusterView>,
}

impl ClusterViewManager {
    pub fn subscribe(&self) -> broadcast::Receiver<ClusterView> {
        self.update_tx.subscribe()
    }
}

Alternative: Polling current() - High CPU usage - Delayed updates - No event-driven routing


3. Refresh After Every Event

Decision: Call refresh() after handling each SWIM event.

Rationale: - Shard assignments: Update which shards are on which nodes - Leader info: Query Raft for current leaders - Consistency: Single atomic view update

Code:

match event {
    MembershipEvent::MemberJoined { id, addr } => {
        self.add_node(id, addr.to_string()).await?;
    }
    // ... other events
}

// Trigger a refresh to update shard assignments
self.refresh().await?;

Alternative: Periodic refresh only - Delayed routing updates - Clients may route to dead nodes - Longer recovery time


4. Role-Based Status

Decision: Track node roles: follower, suspect, failed, leader.

Rationale: - Client routing: Avoid suspected nodes - Debugging: Understand cluster state - Gradual degradation: Suspect → Failed transition

Status Transitions:

                    ┌─────────────┐
                    │   Unknown   │
                    └──────┬──────┘
                           │ MemberJoined
                    ┌─────────────┐
            ┌──────▶│  Follower   │◀──────┐
            │       └──────┬──────┘       │ MemberAlive
            │              │               │
            │              │ MemberSuspect │
            │              ▼               │
            │       ┌─────────────┐        │
            │       │   Suspect   │────────┘
            │       └──────┬──────┘
            │              │ timeout
            │              ▼
            │       ┌─────────────┐
            │       │   Failed    │
            │       └──────┬──────┘
            │              │ MemberLeft
            │              ▼
            │       ┌─────────────┐
            └───────│   Removed   │
                    └─────────────┘

5. Borrow Checker Workaround

Problem: Can't mutate view.epoch while holding mutable iterator to view.nodes.

Code (broken):

// This doesn't compile!
if let Some(node) = view.nodes.iter_mut().find(|n| n.id == node_id) {
    if node.role != role {
        view.epoch += 1;  //  Error: can't borrow view again
        node.role = role.to_string();
    }
}

Solution: Check if update is needed first, then increment epoch before calling iter_mut():

// Check if update is needed
let needs_update = view.nodes.iter()
    .find(|n| n.id == node_id)
    .map(|n| n.role != role)
    .unwrap_or(false);

if needs_update {
    // Increment epoch first
    view.epoch += 1;
    let new_epoch = view.epoch;

    // Then update the role (no conflict)
    if let Some(node) = view.nodes.iter_mut().find(|n| n.id == node_id) {
        node.role = role.to_string();
        tracing::info!("Updated node {} role to {} (epoch {})", node_id, role, new_epoch);
    }
}

Lesson: Separate read and write phases when working with complex data structures.


Performance Characteristics

SWIM Overhead

Message complexity: - Per-node: O(log N) messages per gossip interval - Cluster-wide: O(N log N) total messages per interval - Bandwidth: ~1KB per message × 10 messages/sec = 10KB/sec per node

Detection time: - Suspect: 500ms (ping timeout) - Failed: 5 seconds (indirect ping + suspicion timeout) - Propagation: O(log N) gossip rounds = ~5 seconds for 100 nodes

Topology Watcher Overhead

CPU: - Event handling: <100µs per event - ClusterView update: <500µs (RwLock + epoch increment) - Refresh: ~5-10ms (queries all active shards)

Memory: - ClusterView: ~1KB per node (100 nodes = 100KB) - Broadcast channel: 16-slot ring buffer (~16KB)

Event rate: - Steady state: 0-1 events/sec (gossip updates) - Failure scenario: 5-10 events/sec (suspect → failed) - Cluster expansion: 100 events (all nodes join)


Failure Scenarios

Node Crash

Timeline:

t=0:     Node B crashes
t=500ms: Node A pings B, no ACK → suspects B
t=1s:    Node A gossips "B suspected"
t=2s:    Cluster converges on "B suspected"
t=5s:    Suspicion timeout → B marked failed
t=5.5s:  ClusterView updated, epoch++
t=6s:    Clients receive update, reroute
t=7s:    Raft reconfiguration for B's shards

Recovery:

Node B restarts → increments incarnation → gossips "B alive" → role updated to "follower"


Network Partition

Scenario: Cluster splits into [A, B] and [C, D, E]

Minority partition [A, B]: - A and B suspect C, D, E - Update ClusterView (epoch++) - Cannot commit writes (no Raft quorum) - Clients see "not_leader" errors

Majority partition [C, D, E]: - C, D, E suspect A, B - Update ClusterView (epoch++) - Continue operating (Raft quorum) - New leaders elected for A/B's shards

Partition heals: - A and B receive higher epoch view from C/D/E - A and B adopt majority view - A and B catch up via Raft log replication

Winner: Highest epoch view wins (majority partition)


Split-Brain Prevention

SWIM alone doesn't prevent split-brain - both partitions can accept writes.

NoriKV uses Raft for consistency: - Writes require Raft quorum (majority) - Minority partition cannot commit writes - Clients automatically retry on majority partition

Combined approach: - SWIM: Fast failure detection (5 seconds) - Raft: Strong consistency (quorum required) - ClusterView: Client routing (avoid failed nodes)


Client Integration

Meta.WatchCluster Service

gRPC streaming API:

service Meta {
  rpc WatchCluster(WatchClusterRequest) returns (stream ClusterView);
}

message ClusterView {
  uint64 epoch = 1;
  repeated ClusterNode nodes = 2;
  repeated ShardInfo shards = 3;
}

message ClusterNode {
  string id = 1;
  string addr = 2;
  string role = 3;  // "leader", "follower", "suspect", "failed"
}

Client usage:

let mut stream = client.watch_cluster().await?;

while let Some(view) = stream.next().await {
    let view = view?;

    // Update local routing table
    routing_table.update(view.epoch, view.nodes, view.shards);

    tracing::info!("Cluster view updated to epoch {}", view.epoch);
}

Smart Routing

Client routing logic:

fn route_key(&self, key: &[u8]) -> Result<String, Error> {
    // Hash key to shard
    let shard_id = self.router.shard_for_key(key);

    // Find leader for shard
    let shard = self.routing_table.get_shard(shard_id)?;
    let leader = shard.replicas.iter()
        .find(|r| r.is_leader && r.role != "suspect" && r.role != "failed")
        .ok_or(Error::NoLeader)?;

    Ok(leader.addr.clone())
}

Benefits: - Direct routing to leader (no redirects) - Avoid failed/suspected nodes - Automatic failover (new view pushed)


Monitoring

Key Metrics

# Cluster size
swim_cluster_size

# Failure rate
rate(swim_failed_members_total[5m])

# Topology updates per minute
rate(cluster_view_epoch[1m]) * 60

# Nodes in suspect state
count(cluster_nodes{role="suspect"})

Logging

Structured logs with context:

tracing::info!(
    node_id = %id,
    addr = %addr,
    incarnation = incarnation,
    "Member joined cluster"
);

tracing::warn!(
    node_id = %id,
    incarnation = incarnation,
    "Member suspected, starting indirect ping"
);

Log aggregation query (JSON logs):

# Count suspect events per hour
jq 'select(.fields.event == "MemberSuspect") | .fields.node_id' \
  | sort | uniq -c

# Detect flapping nodes (frequent suspect/alive cycles)
jq 'select(.fields.event == "MemberAlive" or .fields.event == "MemberSuspect") | .fields.node_id' \
  | uniq -c | sort -rn

Testing

Unit Tests

Test event handling:

#[tokio::test]
async fn test_member_joined() {
    let cluster_view = ClusterViewManager::new(...);

    cluster_view.handle_membership_event(
        MembershipEvent::MemberJoined {
            id: "node1".to_string(),
            addr: "10.0.1.1:6000".parse().unwrap(),
        }
    ).await.unwrap();

    let view = cluster_view.current();
    assert_eq!(view.nodes.len(), 2); // includes self
    assert_eq!(view.epoch, 1);
}

Integration Tests

Test failure detection:

#[tokio::test]
async fn test_node_failure_detection() {
    // Start 3-node cluster
    let nodes = start_cluster(3).await;

    // Kill node 2
    nodes[2].shutdown().await;

    // Wait for SWIM to detect failure
    tokio::time::sleep(Duration::from_secs(10)).await;

    // Check cluster view
    let view = nodes[0].cluster_view().current();
    assert!(view.nodes.iter().any(|n| n.id == "node2" && n.role == "failed"));
}

Next Steps