Skip to content

NoriKV Go Client Advanced Patterns

Complex real-world usage patterns and design examples.

Table of Contents

Distributed Counter

Implement a high-throughput distributed counter with sharding to reduce contention.

Implementation

package main

import (
    "context"
    "crypto/rand"
    "encoding/binary"
    "fmt"
    "strconv"
    "sync"

    norikv "github.com/norikv/norikv-go"
)

type DistributedCounter struct {
    client     *norikv.Client
    name       string
    numShards  int
}

func NewDistributedCounter(client *norikv.Client, name string, numShards int) *DistributedCounter {
    return &DistributedCounter{
        client:    client,
        name:      name,
        numShards: numShards,
    }
}

// Increment atomically increments the counter by 1
func (c *DistributedCounter) Increment(ctx context.Context) error {
    // Choose random shard to reduce contention
    shardID := c.randomShard()
    key := []byte(fmt.Sprintf("%s:shard:%d", c.name, shardID))

    const maxRetries = 10
    for attempt := 0; attempt < maxRetries; attempt++ {
        // Read current value
        result, err := c.client.Get(ctx, key, nil)
        if err != nil && !errors.Is(err, norikv.ErrKeyNotFound) {
            return err
        }

        var currentValue int
        if err == nil {
            currentValue, _ = strconv.Atoi(string(result.Value))
        }

        // Increment
        newValue := []byte(strconv.Itoa(currentValue + 1))

        // CAS write
        options := &norikv.PutOptions{}
        if err == nil {
            options.IfMatchVersion = result.Version
        } else {
            options.IfNotExists = true
        }

        _, err = c.client.Put(ctx, key, newValue, options)
        if err == nil {
            return nil
        }

        if !errors.Is(err, norikv.ErrVersionMismatch) && !errors.Is(err, norikv.ErrAlreadyExists) {
            return err
        }

        // Exponential backoff with jitter
        backoff := time.Duration(1<<attempt) * 10 * time.Millisecond
        jitter := time.Duration(rand.Intn(10)) * time.Millisecond
        time.Sleep(backoff + jitter)
    }

    return fmt.Errorf("increment failed after %d retries", maxRetries)
}

// Get returns the total count across all shards
func (c *DistributedCounter) Get(ctx context.Context) (int, error) {
    var wg sync.WaitGroup
    results := make(chan int, c.numShards)
    errors := make(chan error, c.numShards)

    for i := 0; i < c.numShards; i++ {
        wg.Add(1)
        go func(shardID int) {
            defer wg.Done()

            key := []byte(fmt.Sprintf("%s:shard:%d", c.name, shardID))
            result, err := c.client.Get(ctx, key, nil)
            if err != nil {
                if errors.Is(err, norikv.ErrKeyNotFound) {
                    results <- 0
                    return
                }
                errors <- err
                return
            }

            value, _ := strconv.Atoi(string(result.Value))
            results <- value
        }(i)
    }

    wg.Wait()
    close(results)
    close(errors)

    // Check for errors
    select {
    case err := <-errors:
        return 0, err
    default:
    }

    // Sum all shards
    total := 0
    for value := range results {
        total += value
    }

    return total, nil
}

func (c *DistributedCounter) randomShard() int {
    var b [8]byte
    rand.Read(b[:])
    return int(binary.LittleEndian.Uint64(b[:]) % uint64(c.numShards))
}

Usage

counter := NewDistributedCounter(client, "page-views", 10)

// Increment from multiple goroutines
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        counter.Increment(ctx)
    }()
}
wg.Wait()

// Get total
total, _ := counter.Get(ctx)
fmt.Printf("Total: %d\n", total)

Session Management

Implement session storage with automatic expiration using TTL.

Implementation

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/google/uuid"
    norikv "github.com/norikv/norikv-go"
)

type Session struct {
    ID        string
    UserID    int
    Data      map[string]interface{}
    CreatedAt time.Time
    ExpiresAt time.Time
}

type SessionStore struct {
    client *norikv.Client
    ttl    time.Duration
}

func NewSessionStore(client *norikv.Client, ttl time.Duration) *SessionStore {
    return &SessionStore{
        client: client,
        ttl:    ttl,
    }
}

// Create creates a new session
func (s *SessionStore) Create(ctx context.Context, userID int, data map[string]interface{}) (*Session, error) {
    session := &Session{
        ID:        uuid.New().String(),
        UserID:    userID,
        Data:      data,
        CreatedAt: time.Now(),
        ExpiresAt: time.Now().Add(s.ttl),
    }

    sessionData, err := json.Marshal(session)
    if err != nil {
        return nil, err
    }

    key := []byte(fmt.Sprintf("session:%s", session.ID))
    ttlMs := uint64(s.ttl.Milliseconds())

    options := &norikv.PutOptions{
        TTLMs:          &ttlMs,
        IdempotencyKey: session.ID,
    }

    _, err = s.client.Put(ctx, key, sessionData, options)
    if err != nil {
        return nil, err
    }

    return session, nil
}

// Get retrieves a session by ID
func (s *SessionStore) Get(ctx context.Context, sessionID string) (*Session, error) {
    key := []byte(fmt.Sprintf("session:%s", sessionID))

    result, err := s.client.Get(ctx, key, nil)
    if err != nil {
        return nil, err
    }

    var session Session
    if err := json.Unmarshal(result.Value, &session); err != nil {
        return nil, err
    }

    return &session, nil
}

// Update updates session data
func (s *SessionStore) Update(ctx context.Context, sessionID string, data map[string]interface{}) error {
    session, err := s.Get(ctx, sessionID)
    if err != nil {
        return err
    }

    session.Data = data

    sessionData, err := json.Marshal(session)
    if err != nil {
        return err
    }

    key := []byte(fmt.Sprintf("session:%s", sessionID))
    ttlMs := uint64(time.Until(session.ExpiresAt).Milliseconds())

    options := &norikv.PutOptions{
        TTLMs: &ttlMs,
    }

    _, err = s.client.Put(ctx, key, sessionData, options)
    return err
}

// Delete explicitly deletes a session (logout)
func (s *SessionStore) Delete(ctx context.Context, sessionID string) error {
    key := []byte(fmt.Sprintf("session:%s", sessionID))
    return s.client.Delete(ctx, key, nil)
}

Usage

store := NewSessionStore(client, 30*time.Minute)

// Create session
session, _ := store.Create(ctx, 123, map[string]interface{}{
    "role":       "admin",
    "last_login": time.Now(),
})

// Get session
retrieved, _ := store.Get(ctx, session.ID)

// Update session
store.Update(ctx, session.ID, map[string]interface{}{
    "role":       "admin",
    "last_login": time.Now(),
    "page_views": 5,
})

// Delete session (logout)
store.Delete(ctx, session.ID)

Inventory Management

Prevent overselling with atomic CAS operations.

Implementation

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    norikv "github.com/norikv/norikv-go"
)

type InventoryItem struct {
    SKU      string
    Quantity int
    Reserved int
}

type InventoryManager struct {
    client *norikv.Client
}

func NewInventoryManager(client *norikv.Client) *InventoryManager {
    return &InventoryManager{client: client}
}

// Reserve atomically reserves inventory
func (im *InventoryManager) Reserve(ctx context.Context, sku string, quantity int) error {
    key := []byte(fmt.Sprintf("inventory:%s", sku))

    const maxRetries = 10
    for attempt := 0; attempt < maxRetries; attempt++ {
        // Read current inventory
        result, err := im.client.Get(ctx, key, nil)
        if err != nil {
            return err
        }

        var item InventoryItem
        if err := json.Unmarshal(result.Value, &item); err != nil {
            return err
        }

        // Check availability
        available := item.Quantity - item.Reserved
        if available < quantity {
            return fmt.Errorf("insufficient inventory: need %d, have %d", quantity, available)
        }

        // Reserve
        item.Reserved += quantity

        itemData, err := json.Marshal(item)
        if err != nil {
            return err
        }

        // CAS update
        options := &norikv.PutOptions{
            IfMatchVersion: result.Version,
        }

        _, err = im.client.Put(ctx, key, itemData, options)
        if err == nil {
            return nil
        }

        if !errors.Is(err, norikv.ErrVersionMismatch) {
            return err
        }

        // Exponential backoff
        time.Sleep(time.Duration(1<<attempt) * 10 * time.Millisecond)
    }

    return fmt.Errorf("reserve failed after %d retries", maxRetries)
}

// Commit converts reservation to sale
func (im *InventoryManager) Commit(ctx context.Context, sku string, quantity int) error {
    key := []byte(fmt.Sprintf("inventory:%s", sku))

    const maxRetries = 10
    for attempt := 0; attempt < maxRetries; attempt++ {
        result, err := im.client.Get(ctx, key, nil)
        if err != nil {
            return err
        }

        var item InventoryItem
        if err := json.Unmarshal(result.Value, &item); err != nil {
            return err
        }

        // Commit: reduce quantity and reserved
        item.Quantity -= quantity
        item.Reserved -= quantity

        itemData, err := json.Marshal(item)
        if err != nil {
            return err
        }

        options := &norikv.PutOptions{
            IfMatchVersion: result.Version,
        }

        _, err = im.client.Put(ctx, key, itemData, options)
        if err == nil {
            return nil
        }

        if !errors.Is(err, norikv.ErrVersionMismatch) {
            return err
        }

        time.Sleep(time.Duration(1<<attempt) * 10 * time.Millisecond)
    }

    return fmt.Errorf("commit failed after %d retries", maxRetries)
}

// Release cancels a reservation
func (im *InventoryManager) Release(ctx context.Context, sku string, quantity int) error {
    key := []byte(fmt.Sprintf("inventory:%s", sku))

    const maxRetries = 10
    for attempt := 0; attempt < maxRetries; attempt++ {
        result, err := im.client.Get(ctx, key, nil)
        if err != nil {
            return err
        }

        var item InventoryItem
        if err := json.Unmarshal(result.Value, &item); err != nil {
            return err
        }

        // Release reservation
        item.Reserved -= quantity

        itemData, err := json.Marshal(item)
        if err != nil {
            return err
        }

        options := &norikv.PutOptions{
            IfMatchVersion: result.Version,
        }

        _, err = im.client.Put(ctx, key, itemData, options)
        if err == nil {
            return nil
        }

        if !errors.Is(err, norikv.ErrVersionMismatch) {
            return err
        }

        time.Sleep(time.Duration(1<<attempt) * 10 * time.Millisecond)
    }

    return fmt.Errorf("release failed after %d retries", maxRetries)
}

Usage

im := NewInventoryManager(client)

// Reserve inventory
err := im.Reserve(ctx, "SKU-12345", 2)
if err != nil {
    log.Printf("Reservation failed: %v", err)
    return
}

// Process payment...

// Commit or release
if paymentSucceeded {
    im.Commit(ctx, "SKU-12345", 2)
} else {
    im.Release(ctx, "SKU-12345", 2)
}

Caching Layer

Implement a write-through cache with NoriKV.

Implementation

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
    "time"

    norikv "github.com/norikv/norikv-go"
)

type CacheEntry struct {
    Value     []byte
    ExpiresAt time.Time
}

type CacheLayer struct {
    client     *norikv.Client
    localCache sync.Map
    ttl        time.Duration
}

func NewCacheLayer(client *norikv.Client, ttl time.Duration) *CacheLayer {
    cache := &CacheLayer{
        client: client,
        ttl:    ttl,
    }

    // Background cleanup of expired local cache entries
    go cache.cleanupLoop()

    return cache
}

// Get retrieves a value with local cache fallback
func (c *CacheLayer) Get(ctx context.Context, key []byte) ([]byte, error) {
    keyStr := string(key)

    // Check local cache first
    if entry, ok := c.localCache.Load(keyStr); ok {
        cached := entry.(*CacheEntry)
        if time.Now().Before(cached.ExpiresAt) {
            return cached.Value, nil
        }
        c.localCache.Delete(keyStr)
    }

    // Fetch from NoriKV
    result, err := c.client.Get(ctx, key, &norikv.GetOptions{
        Consistency: norikv.ConsistencyStaleOK, // Allow stale for cache
    })
    if err != nil {
        return nil, err
    }

    // Update local cache
    c.localCache.Store(keyStr, &CacheEntry{
        Value:     result.Value,
        ExpiresAt: time.Now().Add(c.ttl),
    })

    return result.Value, nil
}

// Put writes through to NoriKV and updates local cache
func (c *CacheLayer) Put(ctx context.Context, key, value []byte) error {
    // Write to NoriKV
    ttlMs := uint64(c.ttl.Milliseconds())
    _, err := c.client.Put(ctx, key, value, &norikv.PutOptions{
        TTLMs: &ttlMs,
    })
    if err != nil {
        return err
    }

    // Update local cache
    c.localCache.Store(string(key), &CacheEntry{
        Value:     value,
        ExpiresAt: time.Now().Add(c.ttl),
    })

    return nil
}

// Delete removes from both caches
func (c *CacheLayer) Delete(ctx context.Context, key []byte) error {
    // Delete from local cache
    c.localCache.Delete(string(key))

    // Delete from NoriKV
    return c.client.Delete(ctx, key, nil)
}

func (c *CacheLayer) cleanupLoop() {
    ticker := time.NewTicker(c.ttl / 2)
    defer ticker.Stop()

    for range ticker.C {
        now := time.Now()
        c.localCache.Range(func(key, value interface{}) bool {
            entry := value.(*CacheEntry)
            if now.After(entry.ExpiresAt) {
                c.localCache.Delete(key)
            }
            return true
        })
    }
}

Usage

cache := NewCacheLayer(client, 5*time.Minute)

// Write (goes to both caches)
cache.Put(ctx, []byte("user:123"), userData)

// Read (local cache first, then NoriKV)
data, _ := cache.Get(ctx, []byte("user:123"))

// Delete (removes from both)
cache.Delete(ctx, []byte("user:123"))

Rate Limiting

Implement a distributed rate limiter using sliding window.

Implementation

package main

import (
    "context"
    "fmt"
    "strconv"
    "strings"
    "time"

    norikv "github.com/norikv/norikv-go"
)

type RateLimiter struct {
    client      *norikv.Client
    maxRequests int
    window      time.Duration
}

func NewRateLimiter(client *norikv.Client, maxRequests int, window time.Duration) *RateLimiter {
    return &RateLimiter{
        client:      client,
        maxRequests: maxRequests,
        window:      window,
    }
}

// Allow checks if a request is allowed
func (rl *RateLimiter) Allow(ctx context.Context, identifier string) (bool, error) {
    now := time.Now()
    windowStart := now.Add(-rl.window).Unix()

    // Use minute bucket for rate limiting
    bucket := now.Unix() / 60
    key := []byte(fmt.Sprintf("ratelimit:%s:%d", identifier, bucket))

    const maxRetries = 5
    for attempt := 0; attempt < maxRetries; attempt++ {
        // Get current count
        result, err := rl.client.Get(ctx, key, nil)

        var count int
        var version *norikv.Version

        if err == nil {
            count, _ = strconv.Atoi(string(result.Value))
            version = result.Version
        } else if !errors.Is(err, norikv.ErrKeyNotFound) {
            return false, err
        }

        // Check if under limit
        if count >= rl.maxRequests {
            return false, nil
        }

        // Increment
        count++
        ttlMs := uint64(rl.window.Milliseconds())

        options := &norikv.PutOptions{
            TTLMs: &ttlMs,
        }
        if version != nil {
            options.IfMatchVersion = version
        } else {
            options.IfNotExists = true
        }

        _, err = rl.client.Put(ctx, key, []byte(strconv.Itoa(count)), options)
        if err == nil {
            return true, nil
        }

        if !errors.Is(err, norikv.ErrVersionMismatch) && !errors.Is(err, norikv.ErrAlreadyExists) {
            return false, err
        }

        time.Sleep(10 * time.Millisecond)
    }

    return false, fmt.Errorf("rate limit check failed after retries")
}

Usage

limiter := NewRateLimiter(client, 100, 1*time.Minute)

// Check if request is allowed
allowed, _ := limiter.Allow(ctx, "user:123")
if !allowed {
    http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
    return
}

// Process request...

Leader Election

Implement distributed leader election using CAS operations:

package main

import (
    "context"
    "fmt"
    "time"

    norikv "github.com/norikv/norikv-go"
)

type LeaderElection struct {
    client     *norikv.Client
    key        []byte
    nodeID     string
    leaseTTL   time.Duration
}

func NewLeaderElection(client *norikv.Client, name, nodeID string, leaseTTL time.Duration) *LeaderElection {
    return &LeaderElection{
        client:   client,
        key:      []byte(fmt.Sprintf("leader:%s", name)),
        nodeID:   nodeID,
        leaseTTL: leaseTTL,
    }
}

// TryAcquire attempts to become the leader
func (le *LeaderElection) TryAcquire(ctx context.Context) (bool, error) {
    ttlMs := uint64(le.leaseTTL.Milliseconds())

    // Try to create leader key (if not exists)
    _, err := le.client.Put(ctx, le.key, []byte(le.nodeID), &norikv.PutOptions{
        IfNotExists: true,
        TTLMs:       &ttlMs,
    })
    if err == nil {
        return true, nil // We are the leader
    }
    if !errors.Is(err, norikv.ErrAlreadyExists) {
        return false, err
    }

    // Key exists - check if we already hold it
    result, err := le.client.Get(ctx, le.key, nil)
    if err != nil {
        return false, err
    }

    if string(result.Value) == le.nodeID {
        // Refresh our lease
        _, err = le.client.Put(ctx, le.key, []byte(le.nodeID), &norikv.PutOptions{
            IfMatchVersion: result.Version,
            TTLMs:          &ttlMs,
        })
        return err == nil, err
    }

    return false, nil // Another node is leader
}

// IsLeader checks if this node is currently the leader
func (le *LeaderElection) IsLeader(ctx context.Context) (bool, error) {
    result, err := le.client.Get(ctx, le.key, nil)
    if errors.Is(err, norikv.ErrKeyNotFound) {
        return false, nil
    }
    if err != nil {
        return false, err
    }
    return string(result.Value) == le.nodeID, nil
}

// Release voluntarily gives up leadership
func (le *LeaderElection) Release(ctx context.Context) error {
    result, err := le.client.Get(ctx, le.key, nil)
    if err != nil {
        return err
    }
    if string(result.Value) != le.nodeID {
        return nil // Not the leader
    }
    _, err = le.client.Delete(ctx, le.key, &norikv.DeleteOptions{
        IfMatchVersion: result.Version,
    })
    return err
}

Usage

election := NewLeaderElection(client, "worker-pool", "node-1", 30*time.Second)

// Leader election loop
go func() {
    ticker := time.NewTicker(10 * time.Second)
    for range ticker.C {
        isLeader, _ := election.TryAcquire(ctx)
        if isLeader {
            // Perform leader duties
            runLeaderTasks()
        }
    }
}()

Event Sourcing

Implement an event-sourced aggregate using append-only event streams:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    norikv "github.com/norikv/norikv-go"
)

type Event struct {
    Type      string          `json:"type"`
    Data      json.RawMessage `json:"data"`
    Timestamp time.Time       `json:"timestamp"`
    Version   int64           `json:"version"`
}

type EventStore struct {
    client *norikv.Client
}

func NewEventStore(client *norikv.Client) *EventStore {
    return &EventStore{client: client}
}

// Append adds a new event to the stream
func (es *EventStore) Append(ctx context.Context, streamID string, eventType string, data interface{}) error {
    // Get current stream version
    metaKey := []byte(fmt.Sprintf("stream:%s:meta", streamID))

    var version int64 = 0
    var metaVersion *norikv.Version

    result, err := es.client.Get(ctx, metaKey, nil)
    if err == nil {
        json.Unmarshal(result.Value, &version)
        metaVersion = result.Version
    } else if !errors.Is(err, norikv.ErrKeyNotFound) {
        return err
    }

    // Create event
    version++
    eventData, _ := json.Marshal(data)
    event := Event{
        Type:      eventType,
        Data:      eventData,
        Timestamp: time.Now(),
        Version:   version,
    }

    eventBytes, _ := json.Marshal(event)
    eventKey := []byte(fmt.Sprintf("stream:%s:event:%d", streamID, version))

    // Write event
    _, err = es.client.Put(ctx, eventKey, eventBytes, &norikv.PutOptions{
        IfNotExists: true,
    })
    if err != nil {
        return err
    }

    // Update stream metadata with CAS
    versionBytes, _ := json.Marshal(version)
    opts := &norikv.PutOptions{}
    if metaVersion != nil {
        opts.IfMatchVersion = metaVersion
    } else {
        opts.IfNotExists = true
    }

    _, err = es.client.Put(ctx, metaKey, versionBytes, opts)
    return err
}

// ReadStream reads all events for a stream
func (es *EventStore) ReadStream(ctx context.Context, streamID string, fromVersion int64) ([]Event, error) {
    // Get current version
    metaKey := []byte(fmt.Sprintf("stream:%s:meta", streamID))
    result, err := es.client.Get(ctx, metaKey, nil)
    if errors.Is(err, norikv.ErrKeyNotFound) {
        return nil, nil
    }
    if err != nil {
        return nil, err
    }

    var currentVersion int64
    json.Unmarshal(result.Value, &currentVersion)

    // Read events
    var events []Event
    for v := fromVersion; v <= currentVersion; v++ {
        eventKey := []byte(fmt.Sprintf("stream:%s:event:%d", streamID, v))
        result, err := es.client.Get(ctx, eventKey, nil)
        if err != nil {
            continue
        }

        var event Event
        json.Unmarshal(result.Value, &event)
        events = append(events, event)
    }

    return events, nil
}

Usage

store := NewEventStore(client)

// Append events
store.Append(ctx, "order-123", "OrderCreated", map[string]interface{}{
    "customerId": "cust-456",
    "items":      []string{"item-1", "item-2"},
})

store.Append(ctx, "order-123", "OrderShipped", map[string]interface{}{
    "trackingNumber": "TRACK123",
})

// Read event stream
events, _ := store.ReadStream(ctx, "order-123", 1)
for _, e := range events {
    fmt.Printf("Event: %s at %v\n", e.Type, e.Timestamp)
}

Multi-Tenancy

Implement tenant isolation with prefixed keys and quota management:

package main

import (
    "context"
    "fmt"
    "strconv"

    norikv "github.com/norikv/norikv-go"
)

type TenantClient struct {
    client   *norikv.Client
    tenantID string
    quota    int64
}

func NewTenantClient(client *norikv.Client, tenantID string, quota int64) *TenantClient {
    return &TenantClient{
        client:   client,
        tenantID: tenantID,
        quota:    quota,
    }
}

func (tc *TenantClient) prefixKey(key []byte) []byte {
    return []byte(fmt.Sprintf("tenant:%s:%s", tc.tenantID, string(key)))
}

// Put stores a value with tenant isolation and quota check
func (tc *TenantClient) Put(ctx context.Context, key, value []byte, options *norikv.PutOptions) (*norikv.Version, error) {
    // Check quota
    if err := tc.checkQuota(ctx); err != nil {
        return nil, err
    }

    prefixedKey := tc.prefixKey(key)
    version, err := tc.client.Put(ctx, prefixedKey, value, options)
    if err == nil {
        tc.incrementUsage(ctx)
    }
    return version, err
}

// Get retrieves a value with tenant isolation
func (tc *TenantClient) Get(ctx context.Context, key []byte, options *norikv.GetOptions) (*norikv.GetResult, error) {
    prefixedKey := tc.prefixKey(key)
    return tc.client.Get(ctx, prefixedKey, options)
}

// Delete removes a value with tenant isolation
func (tc *TenantClient) Delete(ctx context.Context, key []byte, options *norikv.DeleteOptions) (bool, error) {
    prefixedKey := tc.prefixKey(key)
    deleted, err := tc.client.Delete(ctx, prefixedKey, options)
    if deleted {
        tc.decrementUsage(ctx)
    }
    return deleted, err
}

func (tc *TenantClient) checkQuota(ctx context.Context) error {
    usageKey := []byte(fmt.Sprintf("tenant:%s:usage", tc.tenantID))
    result, err := tc.client.Get(ctx, usageKey, nil)
    if errors.Is(err, norikv.ErrKeyNotFound) {
        return nil // No usage yet
    }
    if err != nil {
        return err
    }

    usage, _ := strconv.ParseInt(string(result.Value), 10, 64)
    if usage >= tc.quota {
        return fmt.Errorf("quota exceeded: %d/%d", usage, tc.quota)
    }
    return nil
}

func (tc *TenantClient) incrementUsage(ctx context.Context) {
    tc.updateUsage(ctx, 1)
}

func (tc *TenantClient) decrementUsage(ctx context.Context) {
    tc.updateUsage(ctx, -1)
}

func (tc *TenantClient) updateUsage(ctx context.Context, delta int64) {
    usageKey := []byte(fmt.Sprintf("tenant:%s:usage", tc.tenantID))

    for attempt := 0; attempt < 5; attempt++ {
        result, err := tc.client.Get(ctx, usageKey, nil)

        var usage int64 = 0
        var version *norikv.Version

        if err == nil {
            usage, _ = strconv.ParseInt(string(result.Value), 10, 64)
            version = result.Version
        }

        usage += delta
        if usage < 0 {
            usage = 0
        }

        opts := &norikv.PutOptions{}
        if version != nil {
            opts.IfMatchVersion = version
        } else {
            opts.IfNotExists = true
        }

        _, err = tc.client.Put(ctx, usageKey, []byte(strconv.FormatInt(usage, 10)), opts)
        if err == nil {
            return
        }
    }
}

Usage

// Create tenant-specific clients
tenant1 := NewTenantClient(client, "acme-corp", 10000)
tenant2 := NewTenantClient(client, "globex", 5000)

// Operations are isolated by tenant
tenant1.Put(ctx, []byte("config"), []byte("acme config"), nil)
tenant2.Put(ctx, []byte("config"), []byte("globex config"), nil)

// Each tenant sees only their data
result1, _ := tenant1.Get(ctx, []byte("config"), nil)
result2, _ := tenant2.Get(ctx, []byte("config"), nil)
// result1.Value = "acme config"
// result2.Value = "globex config"

Build a semantic search engine using vector embeddings:

package main

import (
    "context"
    "encoding/json"
    "fmt"

    norikv "github.com/norikv/norikv-go"
)

type SemanticSearchEngine struct {
    client         *norikv.Client
    embeddingModel EmbeddingModel
    namespace      string
}

type SearchResult struct {
    ID      string
    Title   string
    Content string
    Score   float32
}

type DocumentMeta struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}

func NewSemanticSearchEngine(client *norikv.Client, model EmbeddingModel) *SemanticSearchEngine {
    return &SemanticSearchEngine{
        client:         client,
        embeddingModel: model,
        namespace:      "documents",
    }
}

func (s *SemanticSearchEngine) Initialize(ctx context.Context) error {
    _, err := s.client.VectorCreateIndex(
        ctx,
        s.namespace,
        s.embeddingModel.Dimensions(),
        norikv.DistanceCosine,
        norikv.VectorIndexHNSW,
        nil,
    )
    return err
}

func (s *SemanticSearchEngine) IndexDocument(ctx context.Context, docID, title, content string) error {
    // Generate embedding
    embedding := s.embeddingModel.Embed(title + " " + content)

    // Store embedding
    _, err := s.client.VectorInsert(ctx, s.namespace, docID, embedding, nil)
    if err != nil {
        return err
    }

    // Store metadata
    meta := DocumentMeta{Title: title, Content: content}
    metaBytes, _ := json.Marshal(meta)
    _, err = s.client.Put(ctx, []byte("doc:meta:"+docID), metaBytes, nil)
    return err
}

func (s *SemanticSearchEngine) Search(ctx context.Context, query string, topK int) ([]SearchResult, error) {
    // Generate query embedding
    queryEmbedding := s.embeddingModel.Embed(query)

    // Search
    result, err := s.client.VectorSearch(ctx, s.namespace, queryEmbedding, topK, nil)
    if err != nil {
        return nil, err
    }

    // Fetch metadata
    var results []SearchResult
    for _, match := range result.Matches {
        metaResult, err := s.client.Get(ctx, []byte("doc:meta:"+match.ID), nil)
        if err != nil {
            continue
        }

        var meta DocumentMeta
        json.Unmarshal(metaResult.Value, &meta)

        results = append(results, SearchResult{
            ID:      match.ID,
            Title:   meta.Title,
            Content: meta.Content,
            Score:   1.0 - match.Distance,
        })
    }

    return results, nil
}

// Usage
func main() {
    ctx := context.Background()
    engine := NewSemanticSearchEngine(client, openAIEmbedding)
    engine.Initialize(ctx)

    // Index documents
    engine.IndexDocument(ctx, "doc1", "Machine Learning",
        "Machine learning is a subset of AI...")
    engine.IndexDocument(ctx, "doc2", "Deep Learning",
        "Deep learning uses neural networks...")

    // Search
    results, _ := engine.Search(ctx, "AI neural networks", 5)
    for _, r := range results {
        fmt.Printf("%.2f: %s\n", r.Score, r.Title)
    }
}

Recommendation System

Build a product recommendation engine:

type RecommendationEngine struct {
    client         *norikv.Client
    embeddingModel EmbeddingModel
}

type ProductRecommendation struct {
    ProductID string
    Name      string
    Category  string
    Score     float32
}

func NewRecommendationEngine(client *norikv.Client, model EmbeddingModel) *RecommendationEngine {
    return &RecommendationEngine{
        client:         client,
        embeddingModel: model,
    }
}

func (r *RecommendationEngine) Initialize(ctx context.Context) error {
    // Create product index
    _, err := r.client.VectorCreateIndex(
        ctx, "products",
        r.embeddingModel.Dimensions(),
        norikv.DistanceCosine,
        norikv.VectorIndexHNSW,
        nil,
    )
    if err != nil {
        return err
    }

    // Create user preference index
    _, err = r.client.VectorCreateIndex(
        ctx, "user_prefs",
        r.embeddingModel.Dimensions(),
        norikv.DistanceCosine,
        norikv.VectorIndexHNSW,
        nil,
    )
    return err
}

func (r *RecommendationEngine) IndexProduct(ctx context.Context, productID, name, desc, category string) error {
    embedding := r.embeddingModel.Embed(name + " " + desc + " " + category)
    _, err := r.client.VectorInsert(ctx, "products", productID, embedding, nil)
    return err
}

func (r *RecommendationEngine) RecordInteraction(ctx context.Context, userID, productID string) error {
    // Get product embedding
    productEmbed, err := r.client.VectorGet(ctx, "products", productID)
    if err != nil {
        return err
    }

    // Update user preferences (simple moving average)
    currentPref, _ := r.client.VectorGet(ctx, "user_prefs", userID)
    if currentPref != nil {
        for i := range currentPref {
            currentPref[i] = (currentPref[i] + productEmbed[i]) / 2
        }
    } else {
        currentPref = productEmbed
    }

    _, err = r.client.VectorInsert(ctx, "user_prefs", userID, currentPref, nil)
    return err
}

func (r *RecommendationEngine) GetRecommendations(ctx context.Context, userID string, topK int) ([]ProductRecommendation, error) {
    userPref, err := r.client.VectorGet(ctx, "user_prefs", userID)
    if err != nil || userPref == nil {
        return nil, err
    }

    result, err := r.client.VectorSearch(ctx, "products", userPref, topK, nil)
    if err != nil {
        return nil, err
    }

    var recs []ProductRecommendation
    for _, match := range result.Matches {
        recs = append(recs, ProductRecommendation{
            ProductID: match.ID,
            Score:     1.0 - match.Distance,
        })
    }
    return recs, nil
}

// Usage
func main() {
    ctx := context.Background()
    engine := NewRecommendationEngine(client, embeddingModel)
    engine.Initialize(ctx)

    // Index products
    engine.IndexProduct(ctx, "prod1", "Running Shoes", "Lightweight", "Footwear")
    engine.IndexProduct(ctx, "prod2", "Trail Shoes", "Durable", "Footwear")

    // Record user interactions
    engine.RecordInteraction(ctx, "user123", "prod1")

    // Get recommendations
    recs, _ := engine.GetRecommendations(ctx, "user123", 5)
    for _, r := range recs {
        fmt.Printf("%.2f: %s\n", r.Score, r.ProductID)
    }
}

Document Deduplication

Detect near-duplicate documents using vector similarity:

type DocumentDeduplicator struct {
    client              *norikv.Client
    embeddingModel      EmbeddingModel
    similarityThreshold float32
}

type DeduplicationResult struct {
    IsDuplicate bool
    DuplicateOf string
    Similarity  float32
}

func NewDocumentDeduplicator(client *norikv.Client, model EmbeddingModel, threshold float32) *DocumentDeduplicator {
    return &DocumentDeduplicator{
        client:              client,
        embeddingModel:      model,
        similarityThreshold: threshold,
    }
}

func (d *DocumentDeduplicator) Initialize(ctx context.Context) error {
    _, err := d.client.VectorCreateIndex(
        ctx, "doc_fingerprints",
        d.embeddingModel.Dimensions(),
        norikv.DistanceCosine,
        norikv.VectorIndexHNSW,
        nil,
    )
    return err
}

func (d *DocumentDeduplicator) CheckAndStore(ctx context.Context, docID, content string) (*DeduplicationResult, error) {
    embedding := d.embeddingModel.Embed(content)

    // Search for similar documents
    result, err := d.client.VectorSearch(ctx, "doc_fingerprints", embedding, 5, nil)
    if err != nil {
        return nil, err
    }

    // Check for duplicates
    for _, match := range result.Matches {
        similarity := 1.0 - match.Distance
        if similarity >= d.similarityThreshold {
            return &DeduplicationResult{
                IsDuplicate: true,
                DuplicateOf: match.ID,
                Similarity:  similarity,
            }, nil
        }
    }

    // No duplicate found, store new document
    _, err = d.client.VectorInsert(ctx, "doc_fingerprints", docID, embedding, nil)
    if err != nil {
        return nil, err
    }

    return &DeduplicationResult{IsDuplicate: false}, nil
}

// Usage
func main() {
    ctx := context.Background()
    dedup := NewDocumentDeduplicator(client, embeddingModel, 0.95)
    dedup.Initialize(ctx)

    doc1 := "The quick brown fox jumps over the lazy dog."
    doc2 := "A quick brown fox jumped over a lazy dog."

    result1, _ := dedup.CheckAndStore(ctx, "doc1", doc1)
    fmt.Printf("doc1 is duplicate: %v\n", result1.IsDuplicate) // false

    result2, _ := dedup.CheckAndStore(ctx, "doc2", doc2)
    fmt.Printf("doc2 is duplicate: %v\n", result2.IsDuplicate) // true
    fmt.Printf("doc2 duplicate of: %s\n", result2.DuplicateOf) // doc1
}

Next Steps