API Reference¶
Complete API documentation for all public types and methods in nori-wal.
Quick Reference¶
Core Types¶
| Type | Purpose | Page |
|---|---|---|
Wal |
Main WAL interface | API → |
Record |
Key-value record with metadata | API → |
WalConfig |
Configuration builder | API → |
Position |
Location in the log | See Wal |
RecoveryInfo |
Recovery statistics | See Wal |
Enums¶
| Type | Purpose | Page |
|---|---|---|
FsyncPolicy |
Durability policy | See Config |
Compression |
Compression algorithm | See Record |
Error Types¶
| Type | Purpose | Page |
|---|---|---|
SegmentError |
Segment-level errors | API → |
RecordError |
Record-level errors | API → |
Import Paths¶
// Main types
use nori_wal::{Wal, WalConfig, Record, Position, RecoveryInfo};
// Enums
use nori_wal::{FsyncPolicy, Compression};
// Errors
use nori_wal::{SegmentError, RecordError};
Usage Patterns¶
Basic Write¶
use nori_wal::{Wal, WalConfig, Record};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = WalConfig::default();
let (wal, _) = Wal::open(config).await?;
let record = Record::put(b"key", b"value");
let position = wal.append(&record).await?;
wal.sync().await?;
Ok(())
}
Configuration¶
use nori_wal::{WalConfig, FsyncPolicy};
use std::time::Duration;
use std::path::PathBuf;
let config = WalConfig {
dir: PathBuf::from("/var/lib/myapp/wal"),
max_segment_size: 256 * 1024 * 1024, // 256 MB
fsync_policy: FsyncPolicy::Batch(Duration::from_millis(10)),
preallocate: true,
node_id: 1,
};
let (wal, recovery_info) = Wal::open(config).await?;
Reading¶
use nori_wal::Position;
let mut reader = wal.read_from(Position { segment_id: 0, offset: 0 }).await?;
while let Some((record, position)) = reader.next_record().await? {
println!("Record at {:?}: key={:?}", position, record.key);
}
Error Handling¶
use nori_wal::{Wal, SegmentError};
match Wal::open(config).await {
Ok((wal, info)) => {
if info.corruption_detected {
log::warn!("Corruption detected: {} bytes truncated",
info.bytes_truncated);
}
}
Err(SegmentError::Io(e)) => {
eprintln!("I/O error: {}", e);
}
Err(SegmentError::InvalidConfig(msg)) => {
eprintln!("Invalid config: {}", msg);
}
Err(e) => {
eprintln!("Other error: {}", e);
}
}
API Conventions¶
Async Methods¶
All I/O methods are async and require a Tokio runtime:
// Correct: Use with async runtime
#[tokio::main]
async fn main() {
wal.append(&record).await?;
}
// Wrong: Can't call async methods in sync context
fn main() {
wal.append(&record).await?; // Compile error!
}
Error Handling¶
All fallible operations return Result<T, E>:
// Always handle errors
let position = wal.append(&record).await?; // Propagate with ?
// Or match explicitly
match wal.append(&record).await {
Ok(pos) => println!("Wrote at {:?}", pos),
Err(e) => eprintln!("Failed: {}", e),
}
Thread Safety¶
All types are Send + Sync and can be shared:
let wal = Arc::new(wal);
for _ in 0..4 {
let wal = wal.clone();
tokio::spawn(async move {
wal.append(&record).await?;
});
}
Type Categories¶
Primary API¶
Start here for most use cases:
Advanced API¶
For fine-grained control:
Position- Seek to specific locations (see Wal)SegmentReader- Manual reading (see Wal)FsyncPolicy- Custom durability (see Config)
Observability¶
For monitoring and debugging:
RecoveryInfo- Recovery statistics (see Wal)Metertrait - Custom metrics (see nori-observe)
Examples by Use Case¶
Event Sourcing¶
// Append events
let event = serde_json::to_vec(&MyEvent { id: 1, data: "..." })?;
let record = Record::put(b"aggregate:1", event);
wal.append(&record).await?;
// Replay events
let mut reader = wal.read_from(Position::start()).await?;
while let Some((record, _)) = reader.next_record().await? {
let event: MyEvent = serde_json::from_slice(&record.value)?;
apply_event(event);
}
Key-Value Store¶
// PUT
let record = Record::put(b"user:123", b"alice@example.com");
wal.append(&record).await?;
// DELETE
let record = Record::delete(b"user:123");
wal.append(&record).await?;
// Rebuild state from log
let mut kv = HashMap::new();
let mut reader = wal.read_from(Position::start()).await?;
while let Some((record, _)) = reader.next_record().await? {
if record.tombstone {
kv.remove(&record.key);
} else {
kv.insert(record.key, record.value);
}
}
Message Queue¶
// Publish
let record = Record::put(b"topic:events", b"message payload");
let position = wal.append(&record).await?;
// Consumer tracks position
let mut consumer_position = load_consumer_position()?;
let mut reader = wal.read_from(consumer_position).await?;
while let Some((record, position)) = reader.next_record().await? {
process_message(&record);
consumer_position = position;
save_consumer_position(position)?;
}
Performance Tips¶
Batching¶
// Batch multiple records before syncing
for record in records {
wal.append(&record).await?;
}
wal.sync().await?; // Single fsync for all
Compression¶
// For large values, use compression
let record = Record::put(b"key", large_value)
.with_compression(Compression::Lz4);
wal.append(&record).await?;
Fsync Policy¶
// Choose policy based on durability needs
WalConfig {
fsync_policy: FsyncPolicy::Batch(Duration::from_millis(5)), // Balanced
// or FsyncPolicy::Always // Maximum durability
// or FsyncPolicy::Os // Maximum performance
..Default::default()
}
Detailed API Pages¶
Click through to detailed documentation for each type:
- Wal - Main WAL API with all methods
- Record - Record creation, encoding, and compression
- Configuration - WalConfig and FsyncPolicy options
- Errors & Types - Error types, Position, and RecoveryInfo