Building a Message Queue¶
Simple message queue with consumer position tracking using nori-wal.
Table of contents¶
Problem¶
You want to build a message queue where: - Messages are durably stored - Multiple consumers can read independently - Each consumer tracks its own position - Messages can be replayed - At-least-once delivery semantics
Solution¶
use nori_wal::{Wal, WalConfig, Record, Position};
use serde::{Serialize, Deserialize};
use bytes::Bytes;
use anyhow::Result;
use std::collections::HashMap;
use std::path::PathBuf;
/// A message in the queue
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub id: u64,
pub topic: String,
pub payload: Bytes,
pub timestamp: u64,
}
/// Consumer position tracking
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerOffset {
pub consumer_id: String,
pub segment_id: u64,
pub offset: u64,
pub last_message_id: u64,
}
/// Message queue implementation
pub struct MessageQueue {
wal: Wal,
offsets: HashMap<String, Position>,
next_message_id: u64,
}
impl MessageQueue {
/// Opens or creates a message queue
pub async fn open(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
let config = WalConfig {
dir: path.join("messages"),
max_segment_size: 128 * 1024 * 1024,
fsync_policy: nori_wal::FsyncPolicy::Batch(
std::time::Duration::from_millis(5)
),
preallocate: true,
node_id: 0,
};
let (wal, recovery_info) = Wal::open(config).await?;
println!("Message queue recovered:");
println!(" Messages: {}", recovery_info.valid_records);
println!(" Segments: {}", recovery_info.segments_scanned);
// Find highest message ID
let mut next_message_id = 0u64;
let mut reader = wal.read_from(Position { segment_id: 0, offset: 0 }).await?;
while let Some((record, _)) = reader.next_record().await? {
if let Ok(msg) = serde_json::from_slice::<Message>(&record.value) {
next_message_id = next_message_id.max(msg.id + 1);
}
}
Ok(Self {
wal,
offsets: HashMap::new(),
next_message_id,
})
}
/// Publishes a message to the queue
pub async fn publish(&mut self, topic: &str, payload: Bytes) -> Result<u64> {
let message = Message {
id: self.next_message_id,
topic: topic.to_string(),
payload,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs(),
};
// Serialize message
let message_bytes = serde_json::to_vec(&message)?;
// Write to WAL
let record = Record::put(&message.id.to_le_bytes(), &message_bytes);
self.wal.append(&record).await?;
let message_id = self.next_message_id;
self.next_message_id += 1;
Ok(message_id)
}
/// Syncs messages to disk
pub async fn sync(&self) -> Result<()> {
self.wal.sync().await?;
Ok(())
}
/// Creates a consumer at the beginning of the queue
pub fn consumer(&mut self, consumer_id: &str) -> Consumer {
let position = self.offsets.get(consumer_id).copied()
.unwrap_or(Position { segment_id: 0, offset: 0 });
Consumer {
id: consumer_id.to_string(),
position,
}
}
/// Consumes the next message for a consumer
pub async fn consume(&mut self, consumer: &mut Consumer) -> Result<Option<Message>> {
let mut reader = self.wal.read_from(consumer.position).await?;
if let Some((record, position)) = reader.next_record().await? {
if let Ok(message) = serde_json::from_slice::<Message>(&record.value) {
// Update consumer position
consumer.position = position;
self.offsets.insert(consumer.id.clone(), position);
return Ok(Some(message));
}
}
Ok(None)
}
/// Commits a consumer's current position
pub async fn commit(&mut self, consumer: &Consumer) -> Result<()> {
self.offsets.insert(consumer.id.clone(), consumer.position);
Ok(())
}
/// Resets a consumer to the beginning
pub fn reset_consumer(&mut self, consumer_id: &str) {
self.offsets.insert(
consumer_id.to_string(),
Position { segment_id: 0, offset: 0 },
);
}
/// Seeks a consumer to a specific message ID
pub async fn seek(&mut self, consumer: &mut Consumer, message_id: u64) -> Result<()> {
let mut reader = self.wal.read_from(Position { segment_id: 0, offset: 0 }).await?;
while let Some((record, position)) = reader.next_record().await? {
if let Ok(message) = serde_json::from_slice::<Message>(&record.value) {
if message.id == message_id {
consumer.position = position;
self.offsets.insert(consumer.id.clone(), position);
return Ok(());
}
}
}
Err(anyhow::anyhow!("Message ID {} not found", message_id))
}
/// Returns the current lag for a consumer
pub async fn lag(&self, consumer: &Consumer) -> Result<u64> {
let mut count = 0u64;
let mut reader = self.wal.read_from(consumer.position).await?;
while let Some(_) = reader.next_record().await? {
count += 1;
}
Ok(count)
}
/// Gracefully closes the queue
pub async fn close(self) -> Result<()> {
self.wal.close().await?;
Ok(())
}
}
/// A consumer that reads from the queue
#[derive(Debug, Clone)]
pub struct Consumer {
pub id: String,
position: Position,
}
// Example usage
#[tokio::main]
async fn main() -> Result<()> {
let mut queue = MessageQueue::open("./message_queue").await?;
// Publish messages
for i in 0..10 {
let payload = format!("Message {}", i);
queue.publish("orders", Bytes::from(payload)).await?;
}
queue.sync().await?;
// Create consumers
let mut consumer_a = queue.consumer("consumer-a");
let mut consumer_b = queue.consumer("consumer-b");
// Consumer A reads 5 messages
for _ in 0..5 {
if let Some(msg) = queue.consume(&mut consumer_a).await? {
println!("Consumer A: {:?}", msg);
}
}
queue.commit(&consumer_a).await?;
// Consumer B reads all messages
while let Some(msg) = queue.consume(&mut consumer_b).await? {
println!("Consumer B: {:?}", msg);
}
queue.commit(&consumer_b).await?;
// Check lag
let lag = queue.lag(&consumer_a).await?;
println!("Consumer A lag: {} messages", lag);
queue.close().await?;
Ok(())
}
How It Works¶
1. Message Publishing¶
Messages are appended to the WAL:
pub async fn publish(&mut self, topic: &str, payload: Bytes) -> Result<u64> {
let message = Message {
id: self.next_message_id,
topic: topic.to_string(),
payload,
timestamp: current_timestamp(),
};
let message_bytes = serde_json::to_vec(&message)?;
let record = Record::put(&message.id.to_le_bytes(), &message_bytes);
self.wal.append(&record).await?;
self.next_message_id += 1;
Ok(message.id)
}
2. Consumer Position Tracking¶
Each consumer maintains its own position:
pub struct Consumer {
pub id: String,
position: Position, // Current position in WAL
}
pub async fn consume(&mut self, consumer: &mut Consumer) -> Result<Option<Message>> {
let mut reader = self.wal.read_from(consumer.position).await?;
if let Some((record, new_position)) = reader.next_record().await? {
consumer.position = new_position; // Update position
let message = serde_json::from_slice(&record.value)?;
Ok(Some(message))
} else {
Ok(None) // No more messages
}
}
3. Offset Management¶
Consumer offsets are tracked in memory and persisted on commit:
pub async fn commit(&mut self, consumer: &Consumer) -> Result<()> {
// Store offset in HashMap
self.offsets.insert(consumer.id.clone(), consumer.position);
// For durability, you could also write offsets to a separate WAL
Ok(())
}
Testing¶
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_publish_and_consume() {
let dir = TempDir::new().unwrap();
let mut queue = MessageQueue::open(dir.path()).await.unwrap();
// Publish messages
queue.publish("test", Bytes::from("msg1")).await.unwrap();
queue.publish("test", Bytes::from("msg2")).await.unwrap();
// Consume
let mut consumer = queue.consumer("test-consumer");
let msg1 = queue.consume(&mut consumer).await.unwrap().unwrap();
let msg2 = queue.consume(&mut consumer).await.unwrap().unwrap();
assert_eq!(msg1.id, 0);
assert_eq!(msg2.id, 1);
}
#[tokio::test]
async fn test_multiple_consumers() {
let dir = TempDir::new().unwrap();
let mut queue = MessageQueue::open(dir.path()).await.unwrap();
// Publish 10 messages
for i in 0..10 {
queue.publish("test", Bytes::from(format!("msg{}", i))).await.unwrap();
}
// Consumer A reads 5
let mut consumer_a = queue.consumer("a");
for _ in 0..5 {
queue.consume(&mut consumer_a).await.unwrap();
}
queue.commit(&consumer_a).await.unwrap();
// Consumer B reads all 10
let mut consumer_b = queue.consumer("b");
let mut count = 0;
while let Some(_) = queue.consume(&mut consumer_b).await.unwrap() {
count += 1;
}
assert_eq!(count, 10);
// Consumer A still has 5 messages left
let lag = queue.lag(&consumer_a).await.unwrap();
assert_eq!(lag, 5);
}
#[tokio::test]
async fn test_consumer_recovery() {
let dir = TempDir::new().unwrap();
// Write messages and consume some
{
let mut queue = MessageQueue::open(dir.path()).await.unwrap();
for i in 0..10 {
queue.publish("test", Bytes::from(format!("msg{}", i))).await.unwrap();
}
let mut consumer = queue.consumer("persistent");
for _ in 0..3 {
queue.consume(&mut consumer).await.unwrap();
}
queue.commit(&consumer).await.unwrap();
queue.sync().await.unwrap();
}
// Reopen and verify consumer position
{
let mut queue = MessageQueue::open(dir.path()).await.unwrap();
let mut consumer = queue.consumer("persistent");
// Should start from message 3
let msg = queue.consume(&mut consumer).await.unwrap().unwrap();
assert_eq!(msg.id, 3);
}
}
#[tokio::test]
async fn test_seek() {
let dir = TempDir::new().unwrap();
let mut queue = MessageQueue::open(dir.path()).await.unwrap();
// Publish 10 messages
for i in 0..10 {
queue.publish("test", Bytes::from(format!("msg{}", i))).await.unwrap();
}
let mut consumer = queue.consumer("seeker");
// Seek to message ID 5
queue.seek(&mut consumer, 5).await.unwrap();
// Next message should be ID 5
let msg = queue.consume(&mut consumer).await.unwrap().unwrap();
assert_eq!(msg.id, 5);
}
}
Production Considerations¶
1. Persistent Offset Storage¶
Store consumer offsets durably:
pub struct MessageQueue {
wal: Wal,
offset_wal: Wal, // Separate WAL for offsets
offsets: HashMap<String, Position>,
next_message_id: u64,
}
impl MessageQueue {
pub async fn commit(&mut self, consumer: &Consumer) -> Result<()> {
// Update in-memory
self.offsets.insert(consumer.id.clone(), consumer.position);
// Persist to offset WAL
let offset = ConsumerOffset {
consumer_id: consumer.id.clone(),
segment_id: consumer.position.segment_id,
offset: consumer.position.offset,
last_message_id: 0, // Track if needed
};
let offset_bytes = serde_json::to_vec(&offset)?;
let record = Record::put(consumer.id.as_bytes(), &offset_bytes);
self.offset_wal.append(&record).await?;
Ok(())
}
pub async fn load_offsets(&mut self) -> Result<()> {
let mut reader = self.offset_wal.read_from(Position { segment_id: 0, offset: 0 }).await?;
while let Some((record, _)) = reader.next_record().await? {
if let Ok(offset) = serde_json::from_slice::<ConsumerOffset>(&record.value) {
self.offsets.insert(
offset.consumer_id,
Position {
segment_id: offset.segment_id,
offset: offset.offset,
},
);
}
}
Ok(())
}
}
2. Message Retention¶
Delete old segments:
impl MessageQueue {
/// Deletes messages older than retention period
pub async fn cleanup(&mut self, retention_seconds: u64) -> Result<u64> {
let cutoff = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs()
- retention_seconds;
let mut oldest_position = Position { segment_id: u64::MAX, offset: 0 };
// Find oldest position still in use by any consumer
for position in self.offsets.values() {
if position.segment_id < oldest_position.segment_id {
oldest_position = *position;
}
}
// Delete segments before oldest consumer position
if oldest_position.segment_id > 0 {
self.wal.delete_segments_before(oldest_position.segment_id).await?;
}
Ok(oldest_position.segment_id)
}
}
3. Topic Filtering¶
Filter messages by topic:
pub async fn consume_topic(
&mut self,
consumer: &mut Consumer,
topic: &str,
) -> Result<Option<Message>> {
loop {
let mut reader = self.wal.read_from(consumer.position).await?;
if let Some((record, position)) = reader.next_record().await? {
consumer.position = position;
if let Ok(message) = serde_json::from_slice::<Message>(&record.value) {
if message.topic == topic {
return Ok(Some(message));
}
// Skip messages from other topics
continue;
}
} else {
return Ok(None);
}
}
}
4. Batch Consumption¶
Consume multiple messages at once:
pub async fn consume_batch(
&mut self,
consumer: &mut Consumer,
max_messages: usize,
) -> Result<Vec<Message>> {
let mut messages = Vec::new();
let mut reader = self.wal.read_from(consumer.position).await?;
for _ in 0..max_messages {
if let Some((record, position)) = reader.next_record().await? {
if let Ok(message) = serde_json::from_slice::<Message>(&record.value) {
messages.push(message);
consumer.position = position;
}
} else {
break;
}
}
Ok(messages)
}
5. Monitoring¶
Track queue metrics:
// Messages published
metrics.counter("queue.messages.published", 1, &[("topic", topic)]);
// Consumer lag
metrics.gauge("queue.consumer.lag", lag, &[("consumer", consumer.id)]);
// Consumption rate
metrics.counter("queue.messages.consumed", 1, &[("consumer", consumer.id)]);
Conclusion¶
This recipe demonstrates: - Using WAL as a message log - Independent consumer position tracking - At-least-once delivery semantics - Message replay and seeking
For distributed message queues, consider adding a replication layer.