EventStoreDB (purpose-built event store), or Apache Kafka with log compaction disabled for event sourcing on existing infrastructure.| Component | Role | Technology Options | Scaling Strategy |
|---|---|---|---|
| Command API | Accepts write requests, validates business rules | REST/gRPC endpoint, message queue consumer | Horizontal scaling behind load balancer |
| Command Handler | Processes commands, loads aggregate, emits events | Application service layer | Stateless; scale by adding instances |
| Aggregate | Enforces business invariants, produces domain events | Domain model (DDD aggregate root) | One aggregate instance per ID (single-writer) |
| Event Store | Append-only persistence of domain events | EventStoreDB, PostgreSQL + outbox, Kafka, DynamoDB | Partition by aggregate ID / stream |
| Event Bus | Distributes events to subscribers | Kafka, RabbitMQ, Amazon SNS/SQS, Azure Service Bus | Partition by event type or aggregate ID |
| Projection Engine | Transforms events into read-optimized views | Kafka Streams, custom subscribers, Axon projections | One consumer group per projection |
| Read Model Store | Serves denormalized query data | PostgreSQL, Redis, Elasticsearch, MongoDB | Read replicas, caching, sharding |
| Query API | Serves read requests from materialized views | REST/GraphQL endpoint | Horizontal scaling + CDN/cache |
| Snapshot Store | Caches aggregate state at intervals to speed rebuilds | Same DB as event store or separate cache | Snapshot every N events (100-500) |
| Saga / Process Manager | Coordinates multi-aggregate workflows | Stateful orchestrator consuming events | Partition by saga ID |
| Dead Letter Queue | Captures failed event processing for retry | Kafka DLQ, SQS DLQ, RabbitMQ dead-letter exchange | Monitor size; alert on growth |
| Schema Registry | Manages event schema evolution and compatibility | Confluent Schema Registry, AWS Glue, custom | Central service; version all schemas |
START
├── Do you need different read and write models?
│ ├── NO → Standard CRUD is simpler; skip CQRS entirely
│ └── YES ↓
├── Do you need a complete audit trail / temporal queries?
│ ├── NO → Use CQRS only (separate read/write models, shared or separate DB)
│ └── YES ↓
├── CQRS + Event Sourcing
│ ├── Expected load < 1K concurrent users?
│ │ ├── YES → Single-node EventStoreDB or PostgreSQL event table
│ │ │ Synchronous projections acceptable
│ │ └── NO ↓
│ ├── Expected load 1K-100K concurrent users?
│ │ ├── YES → EventStoreDB cluster or Kafka + dedicated projection service
│ │ │ Async projections with consumer groups
│ │ └── NO ↓
│ └── Expected load > 100K concurrent users?
│ └── YES → Kafka event backbone + partitioned read stores
│ (Elasticsearch, Redis, Cassandra)
│ Multiple projection services + snapshotting
└── Framework available for your language?
├── Java/Kotlin → Axon Framework (batteries-included)
├── C#/.NET → Marten or EventStoreDB .NET client
├── Python/JS/Go → Custom implementation with EventStoreDB or Kafka
└── Any → EventStoreDB gRPC client (polyglot)
Identify the domain boundary where CQRS/ES adds value. Map aggregates (consistency boundaries) using Domain-Driven Design. Each aggregate will have its own event stream. [src1]
Example bounded context: Order Management
├── Aggregate: Order (OrderId)
│ Events: OrderCreated, ItemAdded, ItemRemoved, OrderConfirmed, OrderShipped
├── Aggregate: Inventory (ProductId)
│ Events: StockReserved, StockReleased, StockDepleted
└── Process Manager: OrderFulfillment
Listens: OrderConfirmed → reserves stock → emits OrderReadyToShip
Verify: Each aggregate has a clear consistency boundary. No two aggregates share the same invariant rules.
Define immutable event types with all data needed to reconstruct state. Use past-tense naming. Include metadata (event ID, timestamp, aggregate ID, version). [src2]
{
"event_type": "OrderCreated",
"event_id": "uuid-v4",
"aggregate_id": "order-123",
"aggregate_type": "Order",
"version": 1,
"timestamp": "2026-02-23T10:00:00Z",
"data": {
"customer_id": "cust-456",
"items": [{"product_id": "prod-789", "quantity": 2, "price": 29.99}],
"total": 59.98
},
"metadata": {
"correlation_id": "req-abc",
"causation_id": null,
"user_id": "user-001"
}
}
Verify: Every event contains enough data to rebuild aggregate state without external lookups.
Choose your storage backend. The event store must support append-only writes, optimistic concurrency control (expected version), and reading all events for a given aggregate. [src6]
-- PostgreSQL event store table
CREATE TABLE events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
version INTEGER NOT NULL,
data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (aggregate_id, version) -- optimistic concurrency
);
CREATE INDEX idx_events_aggregate ON events (aggregate_id, version);
CREATE INDEX idx_events_type ON events (event_type, created_at);
Verify: INSERT with duplicate (aggregate_id, version) raises a unique constraint violation (concurrency conflict).
Implement command handlers that load an aggregate from events, apply business rules, and append new events. Use optimistic concurrency to prevent conflicting writes. [src2]
def handle_confirm_order(command):
# 1. Load events from store
events = event_store.load(command.order_id)
# 2. Rebuild aggregate state
order = Order()
for event in events:
order.apply(event)
# 3. Execute business logic
new_events = order.confirm() # raises if invalid state
# 4. Persist new events (with expected version)
event_store.append(
aggregate_id=command.order_id,
events=new_events,
expected_version=order.version
)
# 5. Publish events to bus
event_bus.publish(new_events)
Verify: Sending the same command twice with the same expected version raises a concurrency error on the second attempt.
Create event handlers that subscribe to events and update denormalized read models optimized for specific queries. Each projection is independent and rebuildable. [src3]
// Projection: Order Summary
async function handleEvent(event) {
switch (event.event_type) {
case 'OrderCreated':
await db.query(
`INSERT INTO order_summary (order_id, customer_id, status, total, created_at)
VALUES ($1, $2, 'pending', $3, $4)`,
[event.aggregate_id, event.data.customer_id,
event.data.total, event.timestamp]
);
break;
case 'OrderConfirmed':
await db.query(
`UPDATE order_summary SET status = 'confirmed', confirmed_at = $2
WHERE order_id = $1`,
[event.aggregate_id, event.timestamp]
);
break;
}
}
Verify: After publishing events, query the read model and confirm the projected data matches expected state.
The read model lags behind the write model. Implement strategies: return the command result with the new version, poll until the projection catches up, or use client-side optimistic updates. [src3]
Strategies for eventual consistency:
1. Return write version → client polls read model until version >= write version
2. Causal consistency token → pass correlation ID, read model waits for that event
3. Optimistic UI → update client state immediately, reconcile when projection arrives
4. Read-your-writes → route reads to write model for N seconds after a command
Verify: After a write, the UI shows updated data within the acceptable latency window (typically < 500ms for interactive UIs).
When aggregates accumulate hundreds of events, replaying from the beginning becomes slow. Snapshot aggregate state periodically and load from the latest snapshot plus subsequent events. [src6]
def load_aggregate(aggregate_id):
snapshot = snapshot_store.get_latest(aggregate_id)
if snapshot:
aggregate = deserialize(snapshot.state)
events = event_store.load(aggregate_id,
from_version=snapshot.version + 1)
else:
aggregate = Order()
events = event_store.load(aggregate_id)
for event in events:
aggregate.apply(event)
return aggregate
Verify: Load time for an aggregate with 10,000 events + snapshotting should be < 50ms.
// Input: Domain events as dictionaries with aggregate_id and version
// Output: Persisted events with concurrency protection
import json
import uuid
from datetime import datetime, timezone
import psycopg2
from psycopg2.extras import RealDictCursor
class EventStore:
"""Append-only event store with optimistic concurrency control."""
def __init__(self, dsn: str):
self.conn = psycopg2.connect(dsn)
def append(self, aggregate_id: str, events: list[dict],
expected_version: int) -> None:
"""Append events. Raises on version conflict."""
with self.conn.cursor() as cur:
for i, event in enumerate(events):
version = expected_version + i + 1
cur.execute(
"""INSERT INTO events
(event_id, aggregate_id, aggregate_type,
event_type, version, data, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(str(uuid.uuid4()), aggregate_id,
event["aggregate_type"], event["event_type"],
version, json.dumps(event["data"]),
json.dumps(event.get("metadata", {})))
)
self.conn.commit()
def load(self, aggregate_id: str,
from_version: int = 0) -> list[dict]:
"""Load events for an aggregate, optionally from a version."""
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"""SELECT * FROM events
WHERE aggregate_id = %s AND version > %s
ORDER BY version""",
(aggregate_id, from_version)
)
return cur.fetchall()
// Input: Events from an event store (EventStoreDB or Kafka)
// Output: Denormalized read model in PostgreSQL
const { Pool } = require('pg'); // [email protected]
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const projectionHandlers = {
async OrderCreated(event) {
const { customer_id, items, total } = event.data;
await pool.query(
`INSERT INTO order_summary
(order_id, customer_id, status, item_count, total, created_at, updated_at)
VALUES ($1, $2, 'pending', $3, $4, $5, $5)
ON CONFLICT (order_id) DO NOTHING`,
[event.aggregate_id, customer_id, items.length, total, event.timestamp]
);
},
async OrderConfirmed(event) {
await pool.query(
`UPDATE order_summary
SET status = 'confirmed', updated_at = $2
WHERE order_id = $1`,
[event.aggregate_id, event.timestamp]
);
},
async OrderShipped(event) {
const { tracking_number, carrier } = event.data;
await pool.query(
`UPDATE order_summary
SET status = 'shipped', tracking_number = $2,
carrier = $3, updated_at = $4
WHERE order_id = $1`,
[event.aggregate_id, tracking_number, carrier, event.timestamp]
);
}
};
// Projection runner with checkpoint tracking
async function runProjection(eventStore, projectionName) {
const checkpoint = await loadCheckpoint(projectionName);
let position = checkpoint;
const stream = eventStore.subscribeToAll({ fromPosition: position });
for await (const resolvedEvent of stream) {
const event = resolvedEvent.event;
const handler = projectionHandlers[event.event_type];
if (handler) {
try {
await handler(event);
await saveCheckpoint(projectionName, resolvedEvent.position);
} catch (err) {
console.error(`Projection error:`, err);
await sendToDeadLetter(projectionName, resolvedEvent, err);
}
}
}
}
# BAD — modifying events after they are stored destroys the audit trail
def fix_order_total(order_id, correct_total):
event = event_store.load_latest(order_id, "OrderCreated")
event["data"]["total"] = correct_total # MUTATING a stored event
event_store.update(event) # UPDATING instead of appending
# GOOD — append a correction event; the original event remains immutable
def fix_order_total(order_id, correct_total, reason):
event_store.append(order_id, [{
"aggregate_type": "Order",
"event_type": "OrderTotalCorrected",
"data": {
"previous_total": order.total,
"corrected_total": correct_total,
"reason": reason
}
}], expected_version=order.version)
-- BAD — single normalized read model defeats the purpose of CQRS
SELECT o.id, o.status, c.name, c.email,
SUM(oi.price * oi.quantity) as total,
s.tracking_number, s.carrier
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN order_items oi ON o.id = oi.order_id
LEFT JOIN shipments s ON o.id = s.order_id
WHERE o.status = 'shipped'
GROUP BY o.id, o.status, c.name, c.email, s.tracking_number, s.carrier;
-- GOOD — each projection is a flat, denormalized table for one use case
CREATE TABLE shipped_orders_view (
order_id UUID PRIMARY KEY,
customer_name TEXT,
customer_email TEXT,
item_count INTEGER,
total DECIMAL(10,2),
tracking_number TEXT,
carrier TEXT,
shipped_at TIMESTAMPTZ
);
-- Single-row lookup, no JOINs
SELECT * FROM shipped_orders_view WHERE order_id = $1;
# BAD — internal domain events leak aggregate internals to consumers
event_bus.publish_to_external("order-events-topic", domain_event)
# External consumers now depend on internal event schemas
# GOOD — publish stable integration events for external consumers
def on_order_confirmed(domain_event):
integration_event = {
"type": "OrderConfirmedV1",
"order_id": domain_event.data["order_id"],
"total": domain_event.data["total"],
"confirmed_at": domain_event.timestamp
}
integration_bus.publish("public.orders", integration_event)
# BAD — read model may be stale; business rules must use the write model
async def handle_confirm_order(command):
order_summary = await read_db.query(
"SELECT status FROM order_summary WHERE order_id = $1",
command.order_id
)
if order_summary.status == "pending": # STALE data!
event_store.append(...)
# GOOD — always load from the event store for write-side decisions
async def handle_confirm_order(command):
events = event_store.load(command.order_id)
order = Order()
for event in events:
order.apply(event)
new_events = order.confirm() # raises if not in valid state
event_store.append(command.order_id, new_events, order.version)
read-your-writes consistency via polling with version check or causal consistency tokens. [src3]use a schema registry (Confluent Schema Registry, Avro, Protobuf) with backward-compatible changes only. [src7]snapshot every 100-500 events, or redesign aggregate boundaries. [src6]parallel projection with partitioned consumers and checkpoint-based resumption. [src7]track last processed event ID per projection and skip duplicates. [src5]start with one complex domain, keep the rest as simple CRUD. [src1]route failed events to a DLQ, alert on queue depth, continue processing. [src4]project asynchronously via event subscriptions; accept eventual consistency. [src2]# Check EventStoreDB health (Docker)
curl -s http://localhost:2113/health/live | jq .
# Count events per aggregate in PostgreSQL event store
psql -c "SELECT aggregate_id, COUNT(*) as event_count, MAX(version) as latest_version FROM events GROUP BY aggregate_id ORDER BY event_count DESC LIMIT 20;"
# Check projection lag
psql -c "SELECT e.max_id AS latest_event, p.position AS projected_to, (e.max_id - p.position) AS lag FROM (SELECT MAX(event_id) as max_id FROM events) e, projection_checkpoints p WHERE p.name = 'order_summary';"
# Monitor dead letter queue depth
psql -c "SELECT projection, COUNT(*) as failed_count, MIN(created_at) as oldest_failure FROM dead_letter_queue GROUP BY projection;"
# Check EventStoreDB stream stats
curl -s http://localhost:2113/streams/\$stats-0 -H "Accept: application/json" | jq .
| Pattern / Tool | Status | Breaking Changes | Notes |
|---|---|---|---|
| CQRS (pattern) | Stable since 2010 | None | Coined by Greg Young, based on CQS (Bertrand Meyer) |
| Event Sourcing (pattern) | Stable since 2005 | None | Popularized by Greg Young and Martin Fowler |
| EventStoreDB 24.x | Current | gRPC API is primary (HTTP deprecated) | Docker: eventstore/eventstore:24.10 |
| EventStoreDB 23.x | Maintained | --- | Last version with full HTTP API |
| Axon Framework 4.x | Current (LTS) | Axon Server required for clustering | Java 17+ required since 4.8 |
| Marten 7.x | Current | PostgreSQL 12+ required | .NET 8+ required |
| Kafka (as event store) | Usable with caveats | Not a true event store | Use log compaction OFF for event streams |
| Use When | Don't Use When | Use Instead |
|---|---|---|
| Read and write workloads have vastly different scaling requirements | Simple CRUD with balanced read/write load | Standard three-tier architecture |
| You need a complete, immutable audit trail of all state changes | Audit requirements satisfied by database change logs | CDC (Change Data Capture) with Debezium |
| Domain has complex business rules that benefit from DDD aggregates | Domain is anemic with simple property updates | Active Record or Repository pattern |
| Multiple read models needed for different query patterns | Single query pattern serves all use cases | Standard database views or materialized views |
| You need temporal queries ("what was the state at time T?") | Only current state matters | Standard database with updated_at timestamps |
| High-throughput write workload benefiting from append-only storage | Write volume is moderate and relational ACID is sufficient | PostgreSQL with proper indexing |
| Microservices need to react to domain events across boundaries | Monolithic application with shared database | Database triggers or application events |