Skip to content

Concurrency Model

How nori-wal handles concurrent reads and writes safely and efficiently.

Table of contents


Overview

nori-wal is designed to be safe for concurrent use across multiple threads. The type signature reflects this:

pub struct Wal: Send + Sync

This means: - Send: Can be moved between threads - Sync: Can be shared between threads (via Arc<Wal>)

However, the concurrency model has specific characteristics you need to understand.


Thread Safety Guarantees

What is Safe

// Safe: Multiple threads can share a WAL
let wal = Arc::new(wal);

let wal1 = wal.clone();
tokio::spawn(async move {
    wal1.append(&record).await?;  // Safe
});

let wal2 = wal.clone();
tokio::spawn(async move {
    wal2.append(&record).await?;  // Safe
});

What is Serialized

All writes are serialized internally. Even if multiple threads call append() concurrently, they execute sequentially:

Thread 1: append(R1) ─┐
Thread 2: append(R2) ──┼─→ Mutex → [R1][R2] (sequential on disk)
Thread 3: append(R3) ─┘

Why? - Append-only log must maintain order - Only one thread can write to a file at a time - Serialization ensures consistency


Internal Locking Structure

pub struct SegmentManager {
    config: SegmentConfig,

    // Protected by Mutex - one writer at a time
    current: Arc<Mutex<SegmentFile>>,
    current_id: Arc<Mutex<u64>>,
    last_fsync: Arc<Mutex<Option<Instant>>>,

    // FD cache for readers
    fd_cache: Arc<Mutex<FdCache>>,

    // Immutable (no lock needed)
    meter: Arc<dyn Meter>,
    node_id: u32,
}

Locks Explained

Field Lock Type Purpose Contention
current Mutex<SegmentFile> Active segment for writing High on writes
current_id Mutex<u64> Current segment ID Low (rarely changes)
last_fsync Mutex<Option<Instant>> Last fsync time (for batching) High on writes
fd_cache Mutex<FdCache> File descriptor cache Medium on reads

Write Path Concurrency

Append Operation

pub async fn append(&self, record: &Record) -> Result<Position> {
    // 1. Acquire lock (blocks if another thread is writing)
    let mut current = self.current.lock().await;

    // 2. Encode record (CPU-bound, lock held)
    let encoded = record.encode();

    // 3. Check if rotation needed
    if current.would_exceed(encoded.len(), self.config.max_segment_size) {
        // Rotation requires multiple locks - deadlock potential?
        // Answer: No, because we hold current lock exclusively
        self.rotate(&mut current).await?;
    }

    // 4. Append to segment (I/O-bound, lock held)
    let offset = current.append(record).await?;

    // 5. Fsync based on policy (I/O-bound, lock held)
    self.maybe_fsync(&mut current).await?;

    // 6. Release lock (implicit on drop)
    Ok(Position {
        segment_id: current.id,
        offset,
    })
}

Critical section duration: - Without fsync: ~10-50μs (encoding + write) - With fsync: ~1-5ms (encoding + write + fsync)


Write Contention

When multiple threads write concurrently:

Time →
Thread 1: [acquire lock] [encode] [write] [fsync] [release]
Thread 2:                 [blocked................] [acquire lock] [encode] [write] [fsync] [release]
Thread 3:                                          [blocked...................] [acquire lock] ...

Performance impact:

Fsync Policy Single Thread 4 Threads Contention Overhead
Always ~420 writes/sec ~420 writes/sec None (disk-bound)
Batch(5ms) ~86K writes/sec ~70K writes/sec ~18% (lock overhead)
Os ~110K writes/sec ~85K writes/sec ~23% (lock overhead)

Why contention? - Lock is held during encode, write, and fsync - Multiple threads can't make progress in parallel - CPU encoding happens while lock is held (could be optimized)


Optimization: Pre-Encode Before Locking

Current:

async fn append(&self, record: &Record) -> Result<Position> {
    let mut current = self.current.lock().await;  // Lock
    let encoded = record.encode();                 // Encode (CPU)
    current.write_all(&encoded).await?;            // Write
    // ...
}

Potential optimization (not implemented):

async fn append(&self, record: &Record) -> Result<Position> {
    let encoded = record.encode();                 // Encode BEFORE lock
    let mut current = self.current.lock().await;  // Lock
    current.write_all(&encoded).await?;            // Write
    // ...
}

Benefit: Reduce lock hold time by ~5-10μs (encoding time)

Not implemented because: - Encoding is fast (~2-10μs) - Optimization adds complexity - Lock contention is not the bottleneck for most workloads


Read Path Concurrency

Sequential Reading

pub async fn read_from(&self, position: Position) -> Result<SegmentReader> {
    // 1. Acquire FD cache lock
    let fd = self.fd_cache.lock().await.get_or_open(position.segment_id).await?;

    // 2. Create reader (does not hold lock)
    Ok(SegmentReader {
        file: fd,  // Arc<Mutex<File>>
        position: position.offset,
        buffer: Vec::new(),
    })
}

Reading with Concurrent Writes

Readers do NOT block writers (and vice versa):

Writer Thread: [append R1] [append R2] [append R3]
Reader Thread:               [read R1] [read R2] [block until R3 written]

Why? - Readers have separate file descriptors (via FD cache) - Reads and writes use different syscalls (no OS-level conflicts) - append-only means no in-place modifications


Multiple Concurrent Readers

Multiple readers can read simultaneously:

let wal = Arc::new(wal);

// Reader 1: Read from beginning
let wal1 = wal.clone();
tokio::spawn(async move {
    let mut reader = wal1.read_from(Position::start()).await?;
    while let Some((record, _)) = reader.next_record().await? {
        // Process
    }
});

// Reader 2: Read from position 1000
let wal2 = wal.clone();
tokio::spawn(async move {
    let mut reader = wal2.read_from(Position { segment_id: 0, offset: 1000 }).await?;
    while let Some((record, _)) = reader.next_record().await? {
        // Process
    }
});

Performance: No contention between readers (each has independent FD)


File Descriptor Caching

Why Cache FDs?

Opening files is expensive: - ~100μs per File::open() syscall - ~1μs for cache hit

Without caching:

// Reader 1 opens segment 0: 100μs
// Reader 2 opens segment 0: 100μs (redundant!)
// Reader 3 opens segment 0: 100μs (redundant!)

With caching:

// Reader 1 opens segment 0: 100μs (cache miss, opens FD)
// Reader 2 opens segment 0: 1μs (cache hit, reuses FD)
// Reader 3 opens segment 0: 1μs (cache hit, reuses FD)


FD Cache Structure

struct FdCache {
    cache: HashMap<u64, Arc<Mutex<File>>>,  // segment_id → File
    max_size: usize,                         // LRU limit (default: 16)
    access_order: Vec<u64>,                  // LRU tracking
}

LRU Eviction

When cache reaches capacity, evict least-recently-used FD:

fn insert(&mut self, segment_id: u64, file: File) {
    if self.cache.len() >= self.max_size {
        // Evict LRU
        let lru_id = self.access_order.remove(0);
        self.cache.remove(&lru_id);
        // FD automatically closed when Arc<File> is dropped
    }

    self.cache.insert(segment_id, Arc::new(Mutex::new(file)));
    self.access_order.push(segment_id);
}

Example:

Cache size: 3
Access pattern: [0, 1, 2, 3, 0, 1]

0: Cache miss → Open FD0, cache = [0]
1: Cache miss → Open FD1, cache = [0, 1]
2: Cache miss → Open FD2, cache = [0, 1, 2]
3: Cache miss → Open FD3, evict 0, cache = [1, 2, 3]
0: Cache miss → Open FD0, evict 1, cache = [2, 3, 0]
1: Cache miss → Open FD1, evict 2, cache = [3, 0, 1]


Rotation and Concurrency

Rotation Algorithm

async fn rotate(&self, current: &mut SegmentFile) -> Result<()> {
    // 1. Finalize current segment (truncate to actual size)
    current.finalize().await?;

    // 2. Increment segment ID
    let mut current_id = self.current_id.lock().await;
    *current_id += 1;
    let new_id = *current_id;
    drop(current_id);

    // 3. Create new segment
    let new_segment = SegmentFile::open(
        &self.config.dir,
        new_id,
        true,  // create
        if self.config.preallocate {
            Some(self.config.max_segment_size)
        } else {
            None
        },
    ).await?;

    // 4. Swap in new segment (still holding current lock)
    *current = new_segment;

    Ok(())
}

Key point: Rotation happens while holding the current lock, so no other thread can write during rotation.


Rotation Latency

Rotation adds latency to the write that triggers it:

Step Latency
Finalize old segment (truncate + sync) ~2-5 ms
Open new segment file ~100 μs
Preallocate 128MB ~1-2 ms
Total ~3-7 ms

Impact on concurrent writers:

Thread 1: append(triggers rotation) [3-7ms rotation] [return]
Thread 2: append(blocked)            [wait...........] [acquire lock] [append] [return]
Thread 3: append(blocked)                               [wait......] [acquire lock] [append] [return]

Mitigation: Rotation is rare (once per 128MB by default), so amortized cost is negligible.


Sync and Concurrency

Explicit Sync

wal.sync().await?;

Acquires the lock and forces fsync:

async fn sync(&self) -> Result<()> {
    let mut current = self.current.lock().await;
    current.sync().await?;

    // Update last_fsync time
    let mut last_fsync = self.last_fsync.lock().await;
    *last_fsync = Some(Instant::now());

    Ok(())
}

Blocks all writers until fsync completes (~1-5ms).


Batched Sync (FsyncPolicy::Batch)

async fn maybe_fsync(&self, current: &mut SegmentFile) -> Result<()> {
    if let FsyncPolicy::Batch(window) = self.config.fsync_policy {
        let mut last_fsync = self.last_fsync.lock().await;
        let now = Instant::now();

        if last_fsync.map_or(true, |t| now.duration_since(t) >= window) {
            current.sync().await?;
            *last_fsync = Some(now);
        }
    }

    Ok(())
}

First write in window: Pays fsync cost (~1-5ms) Subsequent writes in window: No fsync (fast)


Drop Handler and Finalization

Best-Effort Finalization

When Wal is dropped, we try to finalize the current segment:

impl Drop for SegmentManager {
    fn drop(&mut self) {
        // Try to acquire lock (non-blocking)
        if let Ok(current) = self.current.try_lock() {
            // Synchronous truncation (can't do async in Drop)
            if current.size != actual_file_size {
                let _ = std::fs::OpenOptions::new()
                    .write(true)
                    .open(&current.path)
                    .and_then(|f| {
                        f.set_len(current.size)?;
                        f.sync_all()
                    });
            }
        }
        // If lock is held, skip finalization (segment will be recovered on next open)
    }
}

Why best-effort? - Can't do async work in Drop - Can't block indefinitely (may be in async runtime shutdown) - Recovery will fix any unfinalizedSegments


Deadlock Analysis

Potential Deadlock Scenario?

// Thread 1: Holds current lock, tries to acquire current_id lock
async fn rotate(&self, current: &mut SegmentFile) {
    // current lock held
    let mut current_id = self.current_id.lock().await;  // Acquire current_id
    // ...
}

// Thread 2: Holds current_id lock, tries to acquire current lock?
// ... No such code path exists!

Conclusion: No deadlock possible - Lock acquisition order is always: currentcurrent_idlast_fsync - Never acquire locks in reverse order - Locks are released quickly (no long-held locks)


Performance Benchmarks

Single-Threaded Performance

Fsync Policy Throughput Latency (p50) Latency (p99)
Always 420 writes/sec 2.1 ms 9.8 ms
Batch(5ms) 86,000 writes/sec 38 μs 5.3 ms
Os 110,000 writes/sec 28 μs 95 μs

Multi-Threaded Performance (4 threads)

Fsync Policy Single Thread 4 Threads Scalability
Always 420/sec 420/sec 1.0x (disk-bound)
Batch(5ms) 86K/sec 70K/sec 0.82x (lock contention)
Os 110K/sec 85K/sec 0.77x (lock contention)

Observation: Write-heavy workloads don't scale linearly due to serialized append.


Read Performance (Concurrent)

Scenario Single Reader 4 Readers Scalability
Sequential read (cached FDs) 200K records/sec 750K records/sec 3.75x
Sequential read (uncached FDs) 180K records/sec 600K records/sec 3.33x

Observation: Reads scale well because they don't share locks with writers.


Best Practices

1. Share WAL Across Threads

// Good: Share a single WAL instance
let wal = Arc::new(wal);

for _ in 0..num_threads {
    let wal = wal.clone();
    tokio::spawn(async move {
        wal.append(&record).await?;
    });
}
// Bad: Open multiple WALs (file corruption risk!)
// DON'T DO THIS:
for _ in 0..num_threads {
    tokio::spawn(async move {
        let (wal, _) = Wal::open(config.clone()).await?;  // Multiple WALs!
        wal.append(&record).await?;
    });
}

2. Batch Writes in Application Code

If you have high write contention, batch at the application level:

// Instead of:
for record in records {
    wal.append(&record).await?;  // Lock per record
}

// Do this:
let mut batch = Vec::new();
for record in records {
    batch.push(record);
}

// Append batch with single lock acquisition
for record in batch {
    wal.append(&record).await?;
}

Or use append_batch() if available (future feature).


3. Use Separate WALs for Independent Data

If you have independent streams of data, use separate WALs:

// Good: Separate WALs for different purposes
let user_wal = Wal::open(WalConfig { dir: "wal/users".into(), .. }).await?;
let event_wal = Wal::open(WalConfig { dir: "wal/events".into(), .. }).await?;

// No lock contention between user and event writes
tokio::join!(
    user_wal.append(&user_record),
    event_wal.append(&event_record),
);

4. Don't Hold Locks Across Async Points

// Bad: Don't do this (not possible with current API, but illustrative)
let current = wal.manager.current.lock().await;  // Acquire lock
tokio::time::sleep(Duration::from_secs(1)).await;  // Hold lock across sleep!
current.append(&record).await?;

// Good: Lock is acquired and released within a single operation
wal.append(&record).await?;  // Lock acquired and released internally

Key Takeaways

  1. WAL is thread-safe (Send + Sync)
  2. Can be shared across threads via Arc<Wal>
  3. Multiple readers and writers are safe

  4. Writes are serialized internally

  5. Only one thread can append at a time
  6. Lock held during encode + write + fsync
  7. Contention reduces throughput by ~20-30%

  8. Reads don't block writes (and vice versa)

  9. Separate file descriptors via FD cache
  10. Concurrent readers scale linearly (~3.75x with 4 threads)

  11. FD cache reduces open() overhead

  12. LRU cache with default size 16
  13. Cache hit: ~1μs, cache miss: ~100μs

  14. Rotation happens while holding lock

  15. Adds ~3-7ms latency to triggering write
  16. Rare event (once per 128MB)
  17. No deadlock possible

  18. Best practice: Share one WAL per directory

  19. Use Arc<Wal> for sharing
  20. Don't open multiple WALs for same directory

What's Next?

You've completed the "How It Works" section! Now explore:

Or dive into the implementation in crates/nori-wal/src/ on GitHub.