Skip to content

NoriKV Python Client API Guide

Complete reference for the NoriKV Python Client SDK with async/await support.

Table of Contents

Installation

pip install norikv
# or
poetry add norikv
# or
pipenv install norikv

Requirements: - Python 3.9 or higher - asyncio support

Quick Start

import asyncio
from norikv import NoriKVClient, ClientConfig

async def main():
    config = ClientConfig(
        nodes=["localhost:9001", "localhost:9002"],
        total_shards=1024,
        timeout=5000,
    )

    async with NoriKVClient(config) as client:
        # Put a value
        version = await client.put("user:alice", "Alice")

        # Get the value
        result = await client.get("user:alice")
        print(f"Value: {result.value.decode()}")

        # Delete
        await client.delete("user:alice")

if __name__ == "__main__":
    asyncio.run(main())

Client Configuration

Basic Configuration

from norikv import NoriKVClient, ClientConfig

config = ClientConfig(
    nodes=["node1:9001", "node2:9001"],
    total_shards=1024,
    timeout=5000,
)

client = NoriKVClient(config)

Configuration Options

Option Type Default Description
nodes list[str] Required List of node addresses (host:port)
total_shards int Required Total number of shards in cluster
timeout int 5000 Request timeout in milliseconds
retry RetryConfig See below Retry policy configuration

Retry Configuration

from norikv import RetryConfig

retry_config = RetryConfig(
    max_attempts=10,
    initial_delay_ms=100,
    max_delay_ms=5000,
    jitter_ms=100,
)

config = ClientConfig(
    nodes=["localhost:9001"],
    total_shards=1024,
    retry=retry_config,
)

Retry Behavior: - Retries transient errors: Unavailable, Aborted, DeadlineExceeded - Does NOT retry: InvalidArgument, NotFound, FailedPrecondition - Uses exponential backoff with jitter

Core Operations

PUT - Write Data

Basic PUT

key = "user:123"
value = json.dumps({"name": "Alice"})

version = await client.put(key, value)
print(f"Written at version: {version}")

PUT with Options

from norikv import PutOptions

options = PutOptions(
    ttl_ms=60000,                    # TTL: 60 seconds
    idempotency_key="order-12345",   # Idempotency key
    if_match_version=expected_version, # CAS
)

version = await client.put(key, value, options)

PutOptions Fields:

Field Type Description
ttl_ms int \| None Time-to-live in milliseconds
idempotency_key str \| None Key for idempotent operations
if_match_version Version \| None Expected version for CAS

GET - Read Data

Basic GET

result = await client.get("user:123")

value = result.value.decode("utf-8")
version = result.version

GET with Consistency Level

from norikv import GetOptions, ConsistencyLevel

options = GetOptions(
    consistency=ConsistencyLevel.LINEARIZABLE,
)

result = await client.get(key, options)

Consistency Levels:

Level Description Use Case
LEASE Default, lease-based read Most operations (fast, usually consistent)
LINEARIZABLE Strictest, always up-to-date Critical reads requiring absolute consistency
STALE_OK May return stale data Read-heavy workloads, caching

DELETE - Remove Data

Basic DELETE

deleted = await client.delete("user:123")
print(f"Deleted: {deleted}")

DELETE with Options

from norikv import DeleteOptions

options = DeleteOptions(
    idempotency_key="delete-order-12345",
    if_match_version=expected_version,
)

await client.delete(key, options)

Advanced Features

Compare-And-Swap (CAS)

Optimistic concurrency control using version matching:

# Read current value
current = await client.get(key)
value = int(current.value.decode())

# Update with CAS
new_value = str(value + 1)
try:
    await client.put(
        key,
        new_value,
        PutOptions(if_match_version=current.version),
    )
    print("CAS succeeded")
except VersionMismatchError:
    print("CAS failed - version changed")

Idempotent Operations

Safe retries using idempotency keys:

import uuid

idempotency_key = f"payment-{uuid.uuid4()}"

# First attempt
v1 = await client.put(key, value, PutOptions(idempotency_key=idempotency_key))

# Retry with same key (safe - returns same version)
v2 = await client.put(key, value, PutOptions(idempotency_key=idempotency_key))

print(v1 == v2)  # True

Time-To-Live (TTL)

Automatic expiration:

await client.put(key, value, PutOptions(ttl_ms=60000))  # Expires in 60 seconds

# Key automatically deleted after TTL
await asyncio.sleep(61)

try:
    await client.get(key)
except KeyNotFoundError:
    print("Key expired")

Cluster Topology

Monitor cluster changes:

# Get current cluster view
view = client.get_cluster_view()
if view:
    print(f"Cluster epoch: {view.epoch}")
    print(f"Nodes: {len(view.nodes)}")

# Subscribe to topology changes
def on_topology_change(event):
    print("Topology changed!")
    print(f"Previous epoch: {event.previous_epoch}")
    print(f"Current epoch: {event.current_epoch}")
    print(f"Added nodes: {event.added_nodes}")
    print(f"Removed nodes: {event.removed_nodes}")

unsubscribe = client.on_topology_change(on_topology_change)

# Later: unsubscribe
unsubscribe()

Client Statistics

Monitor client performance:

stats = client.get_stats()

print(f"Active connections: {stats.pool.active_connections}")
print(f"Total nodes: {stats.router.total_nodes}")
print(f"Cached leaders: {stats.topology.cached_leaders}")

Vector Operations

NoriKV supports vector similarity search for building AI/ML applications, recommendation systems, and semantic search.

Creating a Vector Index

Before inserting vectors, create an index with your configuration:

from norikv import DistanceFunction, VectorIndexType

created = await client.vector_create_index(
    "embeddings",                    # namespace
    1536,                            # dimensions
    DistanceFunction.COSINE,         # distance function
    VectorIndexType.HNSW,            # index type
)

if created:
    print("Index created")
else:
    print("Index already exists")

With Options

from norikv import CreateVectorIndexOptions

options = CreateVectorIndexOptions(
    idempotency_key="create-embeddings-index",
)

created = await client.vector_create_index(
    "embeddings",
    1536,
    DistanceFunction.COSINE,
    VectorIndexType.HNSW,
    options,
)

Distance Functions

Enum Value Description Use Case
DistanceFunction.EUCLIDEAN L2 distance General purpose
DistanceFunction.COSINE Cosine similarity (1 - cos) Text embeddings, normalized vectors
DistanceFunction.INNER_PRODUCT Negative inner product Maximum inner product search

Index Types

Enum Value Description Trade-off
VectorIndexType.BRUTE_FORCE Exact linear scan Exact results, O(n) complexity
VectorIndexType.HNSW Hierarchical Navigable Small World Approximate, O(log n) complexity

Inserting Vectors

embedding = await get_embedding("Hello world")

version = await client.vector_insert(
    "embeddings",    # namespace
    "doc-123",       # unique ID
    embedding,       # list[float]
)

print(f"Inserted at version: {version}")

With Options

from norikv import VectorInsertOptions

options = VectorInsertOptions(
    idempotency_key="insert-doc-123",
)

version = await client.vector_insert("embeddings", "doc-123", embedding, options)

Searching for Similar Vectors

query = await get_embedding("Find similar documents")

result = await client.vector_search(
    "embeddings",    # namespace
    query,           # query vector
    10,              # k nearest neighbors
)

print(f"Search took {result.search_time_us}us")

for match in result.matches:
    print(f"ID: {match.id}, Distance: {match.distance}")

With Options

from norikv import VectorSearchOptions, VectorSearchResult

options = VectorSearchOptions(
    include_vectors=True,  # include vector data in results
)

result: VectorSearchResult = await client.vector_search(
    "embeddings",
    query,
    10,
    options,
)

for match in result.matches:
    print(f"ID: {match.id}, Distance: {match.distance}")
    if match.vector:
        print(f"Vector dims: {len(match.vector)}")

Getting a Vector by ID

vector = await client.vector_get("embeddings", "doc-123")

if vector:
    print(f"Vector has {len(vector)} dimensions")
else:
    print("Vector not found")

Deleting Vectors

deleted = await client.vector_delete("embeddings", "doc-123")

if deleted:
    print("Vector deleted")
else:
    print("Vector not found")

With Options

from norikv import VectorDeleteOptions

options = VectorDeleteOptions(
    idempotency_key="delete-doc-123",
)

deleted = await client.vector_delete("embeddings", "doc-123", options)

Dropping a Vector Index

dropped = await client.vector_drop_index("embeddings")

if dropped:
    print("Index dropped")
else:
    print("Index did not exist")

Complete Vector Example

import asyncio
from norikv import (
    NoriKVClient,
    ClientConfig,
    DistanceFunction,
    VectorIndexType,
)

async def main():
    config = ClientConfig(
        nodes=["localhost:9001"],
        total_shards=1024,
    )

    async with NoriKVClient(config) as client:
        # Create index
        await client.vector_create_index(
            "products",
            768,
            DistanceFunction.COSINE,
            VectorIndexType.HNSW,
        )

        # Insert product embeddings
        product_embedding = await get_product_embedding("Red running shoes")
        await client.vector_insert("products", "prod-001", product_embedding)

        # Search for similar products
        query_embedding = await get_product_embedding("Athletic footwear")
        results = await client.vector_search("products", query_embedding, 5)

        print("Similar products:")
        for match in results.matches:
            print(f"  {match.id} (distance: {match.distance:.4f})")

        # Cleanup
        await client.vector_delete("products", "prod-001")
        await client.vector_drop_index("products")


async def get_product_embedding(text: str) -> list[float]:
    # Call your embedding model here
    return [0.0] * 768


if __name__ == "__main__":
    asyncio.run(main())

Error Handling

Error Types

from norikv import (
    KeyNotFoundError,
    VersionMismatchError,
    AlreadyExistsError,
    ConnectionError,
    NoriKVError,
)

Handling Specific Errors

try:
    result = await client.get(key)
except KeyNotFoundError:
    print("Key not found")
except ConnectionError as err:
    print(f"Connection error: {err}")
except NoriKVError as err:
    print(f"Error: {err.code} - {err}")

Retry Pattern

from typing import TypeVar, Callable, Awaitable

T = TypeVar("T")

async def retry_operation(
    operation: Callable[[], Awaitable[T]],
    max_attempts: int = 3,
) -> T:
    for attempt in range(1, max_attempts + 1):
        try:
            return await operation()
        except ConnectionError as err:
            if attempt == max_attempts:
                raise  # Give up

            # Exponential backoff
            await asyncio.sleep((2 ** attempt) * 0.1)

    raise RuntimeError("Unreachable")

# Usage
result = await retry_operation(lambda: client.put(key, value))

Graceful Degradation

async def get_with_fallback(
    client: NoriKVClient,
    key: str,
    default_value: str,
) -> str:
    try:
        result = await client.get(key)
        return result.value.decode()
    except Exception as err:
        print(f"Failed to get key, using default: {err}")
        return default_value

Best Practices

1. Use Context Managers

#  Good: Context manager ensures cleanup
async with NoriKVClient(config) as client:
    await client.put(key, value)

#  Bad: Manual cleanup required
client = NoriKVClient(config)
await client.connect()
await client.put(key, value)
await client.close()

2. Reuse Client Instances

#  Good: Single client instance
client: NoriKVClient | None = None

async def init():
    global client
    config = ClientConfig(nodes=["localhost:9001"], total_shards=1024)
    client = NoriKVClient(config)
    await client.connect()

#  Bad: Creating client per request
async def handle_request():
    async with NoriKVClient(config) as client:
        await client.put(key, value)
    # Closes connections!

3. Use Type Hints

from typing import Any
from dataclasses import dataclass

@dataclass
class UserData:
    id: str
    name: str
    email: str

async def update_user(user_id: str, data: UserData) -> Version:
    key = f"user:{user_id}"
    value = json.dumps(data.__dict__)

    options = PutOptions(ttl_ms=3600000)
    return await client.put(key, value, options)

4. Handle Errors Properly

async def safe_get(key: str) -> str | None:
    try:
        result = await client.get(key)
        return result.value.decode()
    except KeyNotFoundError:
        return None

5. Use asyncio Consistently

#  Good: Clean async/await
async def process_user(user_id: str):
    user_data = await client.get(f"user:{user_id}")
    processed = await process_data(user_data.value)
    await client.put(f"processed:{user_id}", processed)

#  Bad: Mixing sync and async
def process_user_bad(user_id: str):
    loop = asyncio.get_event_loop()
    user_data = loop.run_until_complete(client.get(f"user:{user_id}"))
    # This blocks the event loop

6. Use Idempotency Keys for Important Operations

async def create_order(order_id: str, data: dict[str, Any]):
    await client.put(
        f"order:{order_id}",
        json.dumps(data),
        PutOptions(idempotency_key=f"create-order-{order_id}"),
    )

7. Choose Appropriate Consistency

# For critical reads
result = await client.get(
    key,
    GetOptions(consistency=ConsistencyLevel.LINEARIZABLE),
)

# For cache-like reads
result = await client.get(
    key,
    GetOptions(consistency=ConsistencyLevel.STALE_OK),
)

8. Use Proper Encoding

# Always specify encoding
value_bytes = "Hello, World!".encode("utf-8")
await client.put(key, value_bytes)

result = await client.get(key)
text = result.value.decode("utf-8")

Performance Tips

1. Batch Operations with gather

# Process multiple operations concurrently
await asyncio.gather(
    *[client.put(key, value) for key in keys]
)

2. Connection Pooling (Automatic)

The client maintains connection pools internally - no external pooling needed.

3. Avoid Creating Clients Per Request

Reuse client instances across requests for better performance.

4. Use Appropriate Value Sizes

  • Optimal: 100 bytes - 10 KB
  • Maximum: Limited by memory and network

5. Monitor Performance

import time

start = time.time()
await client.put(key, value)
duration = time.time() - start
print(f"PUT took {duration * 1000:.2f}ms")

Complete Example

import asyncio
import json
from typing import Any

from norikv import (
    NoriKVClient,
    ClientConfig,
    PutOptions,
    GetOptions,
    ConsistencyLevel,
    RetryConfig,
    VersionMismatchError,
)

async def main():
    # Configure with retry policy
    config = ClientConfig(
        nodes=["localhost:9001", "localhost:9002"],
        total_shards=1024,
        timeout=5000,
        retry=RetryConfig(
            max_attempts=5,
            initial_delay_ms=100,
            max_delay_ms=2000,
        ),
    )

    async with NoriKVClient(config) as client:
        # Write with TTL and idempotency
        key = "session:abc123"
        value = json.dumps({"userId": 42})

        put_opts = PutOptions(
            ttl_ms=3600000,  # 1 hour
            idempotency_key="session-create-abc123",
        )

        version = await client.put(key, value.encode(), put_opts)
        print(f"Written: {version}")

        # Read with linearizable consistency
        get_opts = GetOptions(
            consistency=ConsistencyLevel.LINEARIZABLE,
        )

        result = await client.get(key, get_opts)
        print(f"Read: {result.value.decode()}")

        # Update with CAS
        new_value = json.dumps({"userId": 42, "active": True})

        try:
            await client.put(
                key,
                new_value.encode(),
                PutOptions(if_match_version=result.version),
            )
            print("CAS succeeded")
        except VersionMismatchError:
            print("CAS failed - retry needed")

        # Monitor topology
        def on_change(event):
            print(f"Cluster changed: epoch {event.current_epoch}")

        client.on_topology_change(on_change)

        # Get statistics
        stats = client.get_stats()
        print(f"Stats: {stats}")

if __name__ == "__main__":
    asyncio.run(main())

Async Context Manager Pattern

# Proper pattern for long-lived applications
class Application:
    def __init__(self):
        self.client: NoriKVClient | None = None

    async def startup(self):
        config = ClientConfig(
            nodes=["localhost:9001"],
            total_shards=1024,
        )
        self.client = NoriKVClient(config)
        await self.client.connect()

    async def shutdown(self):
        if self.client:
            await self.client.close()

    async def handle_request(self, key: str, value: str):
        if not self.client:
            raise RuntimeError("Client not initialized")

        await self.client.put(key, value)

# Usage
app = Application()
await app.startup()

try:
    await app.handle_request("key", "value")
finally:
    await app.shutdown()

Next Steps