Event Sourcing with nori-wal¶
Building an event-sourced system using WAL for durable event storage.
Table of contents¶
Problem¶
You want to build an event-sourced application where: - All state changes are captured as events - Events are immutable and append-only - State can be reconstructed by replaying events - Events survive crashes and restarts
Solution¶
use nori_wal::{Wal, WalConfig, Record, Position};
use serde::{Serialize, Deserialize};
use bytes::Bytes;
use anyhow::Result;
use std::path::PathBuf;
/// Base event trait with metadata
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Event {
pub event_id: u64,
pub event_type: String,
pub timestamp: u64,
pub payload: Bytes,
}
/// Example: Bank account events
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AccountEvent {
AccountOpened { account_id: String, initial_balance: i64 },
MoneyDeposited { account_id: String, amount: i64 },
MoneyWithdrawn { account_id: String, amount: i64 },
AccountClosed { account_id: String },
}
/// Account state (derived from events)
#[derive(Debug, Clone)]
pub struct Account {
pub id: String,
pub balance: i64,
pub is_closed: bool,
}
/// Event store using WAL
pub struct EventStore {
wal: Wal,
next_event_id: u64,
}
impl EventStore {
/// Opens or creates an event store
pub async fn open(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
let config = WalConfig {
dir: path.join("events"),
max_segment_size: 128 * 1024 * 1024,
fsync_policy: nori_wal::FsyncPolicy::Batch(
std::time::Duration::from_millis(5)
),
preallocate: true,
node_id: 0,
};
let (wal, recovery_info) = Wal::open(config).await?;
println!("Event store recovered:");
println!(" Events: {}", recovery_info.valid_records);
println!(" Segments: {}", recovery_info.segments_scanned);
// Find highest event ID
let mut next_event_id = 0u64;
let mut reader = wal.read_from(Position { segment_id: 0, offset: 0 }).await?;
while let Some((record, _)) = reader.next_record().await? {
if let Ok(event) = serde_json::from_slice::<Event>(&record.value) {
next_event_id = next_event_id.max(event.event_id + 1);
}
}
println!(" Next event ID: {}", next_event_id);
Ok(Self { wal, next_event_id })
}
/// Appends an event to the store
pub async fn append_event(
&mut self,
event_type: String,
payload: Bytes,
) -> Result<u64> {
let event = Event {
event_id: self.next_event_id,
event_type,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs(),
payload,
};
// Serialize event
let event_bytes = serde_json::to_vec(&event)?;
// Write to WAL
let record = Record::put(&event.event_id.to_le_bytes(), &event_bytes);
self.wal.append(&record).await?;
let event_id = self.next_event_id;
self.next_event_id += 1;
Ok(event_id)
}
/// Syncs events to disk
pub async fn sync(&self) -> Result<()> {
self.wal.sync().await?;
Ok(())
}
/// Reads all events from the store
pub async fn read_all_events(&self) -> Result<Vec<Event>> {
let mut events = Vec::new();
let mut reader = self.wal.read_from(Position { segment_id: 0, offset: 0 }).await?;
while let Some((record, _)) = reader.next_record().await? {
if let Ok(event) = serde_json::from_slice::<Event>(&record.value) {
events.push(event);
}
}
Ok(events)
}
/// Reads events from a specific position
pub async fn read_from(&self, from_event_id: u64) -> Result<Vec<Event>> {
let mut events = Vec::new();
let mut reader = self.wal.read_from(Position { segment_id: 0, offset: 0 }).await?;
while let Some((record, _)) = reader.next_record().await? {
if let Ok(event) = serde_json::from_slice::<Event>(&record.value) {
if event.event_id >= from_event_id {
events.push(event);
}
}
}
Ok(events)
}
/// Gracefully closes the event store
pub async fn close(self) -> Result<()> {
self.wal.close().await?;
Ok(())
}
}
/// Application service using event sourcing
pub struct AccountService {
event_store: EventStore,
}
impl AccountService {
pub async fn new(path: impl Into<PathBuf>) -> Result<Self> {
let event_store = EventStore::open(path).await?;
Ok(Self { event_store })
}
/// Opens a new account
pub async fn open_account(&mut self, account_id: String, initial_balance: i64) -> Result<u64> {
let event = AccountEvent::AccountOpened {
account_id,
initial_balance,
};
let payload = serde_json::to_vec(&event)?;
let event_id = self.event_store.append_event(
"AccountOpened".to_string(),
Bytes::from(payload),
).await?;
Ok(event_id)
}
/// Deposits money
pub async fn deposit(&mut self, account_id: String, amount: i64) -> Result<u64> {
let event = AccountEvent::MoneyDeposited {
account_id,
amount,
};
let payload = serde_json::to_vec(&event)?;
let event_id = self.event_store.append_event(
"MoneyDeposited".to_string(),
Bytes::from(payload),
).await?;
Ok(event_id)
}
/// Withdraws money
pub async fn withdraw(&mut self, account_id: String, amount: i64) -> Result<u64> {
// First, check current balance by replaying events
let account = self.get_account_state(&account_id).await?;
if account.balance < amount {
return Err(anyhow::anyhow!("Insufficient balance"));
}
let event = AccountEvent::MoneyWithdrawn {
account_id,
amount,
};
let payload = serde_json::to_vec(&event)?;
let event_id = self.event_store.append_event(
"MoneyWithdrawn".to_string(),
Bytes::from(payload),
).await?;
Ok(event_id)
}
/// Gets account state by replaying events
pub async fn get_account_state(&self, account_id: &str) -> Result<Account> {
let events = self.event_store.read_all_events().await?;
let mut account = Account {
id: account_id.to_string(),
balance: 0,
is_closed: false,
};
for event in events {
if let Ok(account_event) = serde_json::from_slice::<AccountEvent>(&event.payload) {
match account_event {
AccountEvent::AccountOpened { account_id: id, initial_balance } => {
if id == account_id {
account.balance = initial_balance;
}
}
AccountEvent::MoneyDeposited { account_id: id, amount } => {
if id == account_id {
account.balance += amount;
}
}
AccountEvent::MoneyWithdrawn { account_id: id, amount } => {
if id == account_id {
account.balance -= amount;
}
}
AccountEvent::AccountClosed { account_id: id } => {
if id == account_id {
account.is_closed = true;
}
}
}
}
}
Ok(account)
}
/// Syncs events to disk
pub async fn sync(&self) -> Result<()> {
self.event_store.sync().await
}
/// Closes the service
pub async fn close(self) -> Result<()> {
self.event_store.close().await
}
}
// Example usage
#[tokio::main]
async fn main() -> Result<()> {
let mut service = AccountService::new("./event_store").await?;
// Open account
service.open_account("alice".to_string(), 1000).await?;
// Perform operations
service.deposit("alice".to_string(), 500).await?;
service.withdraw("alice".to_string(), 200).await?;
// Sync to ensure durability
service.sync().await?;
// Query current state
let account = service.get_account_state("alice").await?;
println!("Alice's balance: ${}", account.balance);
// Close service
service.close().await?;
Ok(())
}
How It Works¶
1. Event Storage¶
Events are stored as WAL records:
pub async fn append_event(&mut self, event_type: String, payload: Bytes) -> Result<u64> {
let event = Event {
event_id: self.next_event_id,
event_type,
timestamp: current_timestamp(),
payload,
};
let event_bytes = serde_json::to_vec(&event)?;
let record = Record::put(&event.event_id.to_le_bytes(), &event_bytes);
self.wal.append(&record).await?;
self.next_event_id += 1;
Ok(event.event_id)
}
2. State Reconstruction¶
State is derived by replaying all events:
pub async fn get_account_state(&self, account_id: &str) -> Result<Account> {
let events = self.event_store.read_all_events().await?;
let mut account = Account::default();
for event in events {
// Apply event to state
match event {
AccountEvent::AccountOpened { initial_balance, .. } => {
account.balance = initial_balance;
}
AccountEvent::MoneyDeposited { amount, .. } => {
account.balance += amount;
}
// ... other events
}
}
Ok(account)
}
3. Command Validation¶
Commands validate against current state before appending events:
pub async fn withdraw(&mut self, account_id: String, amount: i64) -> Result<u64> {
// 1. Rebuild state
let account = self.get_account_state(&account_id).await?;
// 2. Validate command
if account.balance < amount {
return Err(anyhow::anyhow!("Insufficient balance"));
}
// 3. Append event
let event = AccountEvent::MoneyWithdrawn { account_id, amount };
self.event_store.append_event(event).await
}
Testing¶
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_event_sourcing_basic() {
let dir = TempDir::new().unwrap();
let mut service = AccountService::new(dir.path()).await.unwrap();
// Open account
service.open_account("alice".to_string(), 1000).await.unwrap();
// Deposit
service.deposit("alice".to_string(), 500).await.unwrap();
// Check state
let account = service.get_account_state("alice").await.unwrap();
assert_eq!(account.balance, 1500);
}
#[tokio::test]
async fn test_event_replay() {
let dir = TempDir::new().unwrap();
// Write events
{
let mut service = AccountService::new(dir.path()).await.unwrap();
service.open_account("bob".to_string(), 2000).await.unwrap();
service.deposit("bob".to_string(), 1000).await.unwrap();
service.withdraw("bob".to_string(), 500).await.unwrap();
service.sync().await.unwrap();
}
// Reopen and verify state is reconstructed
{
let service = AccountService::new(dir.path()).await.unwrap();
let account = service.get_account_state("bob").await.unwrap();
assert_eq!(account.balance, 2500);
}
}
#[tokio::test]
async fn test_insufficient_balance() {
let dir = TempDir::new().unwrap();
let mut service = AccountService::new(dir.path()).await.unwrap();
service.open_account("charlie".to_string(), 100).await.unwrap();
// Try to withdraw more than balance
let result = service.withdraw("charlie".to_string(), 200).await;
assert!(result.is_err());
}
}
Production Considerations¶
1. Snapshots¶
Replaying millions of events is slow. Add snapshots:
pub struct EventStore {
wal: Wal,
next_event_id: u64,
snapshot_dir: PathBuf,
}
impl EventStore {
/// Creates a snapshot at current position
pub async fn create_snapshot(&self, state: &impl Serialize) -> Result<u64> {
let snapshot_id = self.next_event_id - 1;
let snapshot_path = self.snapshot_dir.join(format!("{}.snapshot", snapshot_id));
let snapshot_bytes = serde_json::to_vec(state)?;
tokio::fs::write(&snapshot_path, snapshot_bytes).await?;
Ok(snapshot_id)
}
/// Loads latest snapshot
pub async fn load_snapshot<T: DeserializeOwned>(&self) -> Result<Option<(u64, T)>> {
let mut snapshots = Vec::new();
let mut entries = tokio::fs::read_dir(&self.snapshot_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if let Some(name) = entry.file_name().to_str() {
if let Some(id_str) = name.strip_suffix(".snapshot") {
if let Ok(id) = id_str.parse::<u64>() {
snapshots.push(id);
}
}
}
}
if snapshots.is_empty() {
return Ok(None);
}
// Load most recent snapshot
snapshots.sort();
let latest_id = snapshots.last().unwrap();
let snapshot_path = self.snapshot_dir.join(format!("{}.snapshot", latest_id));
let snapshot_bytes = tokio::fs::read(&snapshot_path).await?;
let state = serde_json::from_slice(&snapshot_bytes)?;
Ok(Some((*latest_id, state)))
}
}
2. Event Versioning¶
Events evolve over time. Use versioning:
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AccountEventV1 {
AccountOpened { account_id: String, initial_balance: i64 },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AccountEventV2 {
AccountOpened { account_id: String, initial_balance: i64, currency: String },
MoneyDeposited { account_id: String, amount: i64 },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "version")]
pub enum VersionedEvent {
V1(AccountEventV1),
V2(AccountEventV2),
}
// Upcasting from V1 to V2
impl From<AccountEventV1> for AccountEventV2 {
fn from(v1: AccountEventV1) -> Self {
match v1 {
AccountEventV1::AccountOpened { account_id, initial_balance } => {
AccountEventV2::AccountOpened {
account_id,
initial_balance,
currency: "USD".to_string(), // Default for old events
}
}
}
}
}
3. Projections¶
Maintain read models for fast queries:
pub struct AccountProjection {
accounts: HashMap<String, Account>,
last_processed_event: u64,
}
impl AccountProjection {
pub async fn update_from_events(&mut self, events: Vec<Event>) -> Result<()> {
for event in events {
if event.event_id <= self.last_processed_event {
continue; // Already processed
}
// Apply event to projection
if let Ok(account_event) = serde_json::from_slice::<AccountEvent>(&event.payload) {
self.apply_event(account_event);
}
self.last_processed_event = event.event_id;
}
Ok(())
}
fn apply_event(&mut self, event: AccountEvent) {
// Update in-memory projection
match event {
AccountEvent::AccountOpened { account_id, initial_balance } => {
self.accounts.insert(account_id.clone(), Account {
id: account_id,
balance: initial_balance,
is_closed: false,
});
}
// ... other events
}
}
pub fn get_account(&self, account_id: &str) -> Option<&Account> {
self.accounts.get(account_id)
}
}
4. Monitoring¶
Track event store metrics:
// Events appended
metrics.counter("events.appended", 1);
// Event types
metrics.counter("events.type", 1, &[("type", event.event_type)]);
// Replay time
let start = Instant::now();
let events = store.read_all_events().await?;
metrics.histogram("events.replay_ms", start.elapsed().as_millis());
Enhancements¶
Time Travel Queries¶
Query state at any point in time:
pub async fn get_account_state_at(
&self,
account_id: &str,
timestamp: u64,
) -> Result<Account> {
let events = self.event_store.read_all_events().await?;
let mut account = Account::default();
for event in events {
if event.timestamp > timestamp {
break; // Stop at target time
}
// Apply event
// ...
}
Ok(account)
}
Event Subscriptions¶
Stream events to subscribers:
pub struct EventSubscriber {
store: EventStore,
last_seen_event: u64,
}
impl EventSubscriber {
pub async fn poll_new_events(&mut self) -> Result<Vec<Event>> {
let events = self.store.read_from(self.last_seen_event + 1).await?;
if let Some(last) = events.last() {
self.last_seen_event = last.event_id;
}
Ok(events)
}
}
Conclusion¶
This recipe demonstrates: - Using WAL for event storage - Rebuilding state from events - Command validation - Event replay and recovery
For distributed event sourcing, consider adding a replication layer.