Message Queue & Event-Driven Architecture Design
How do I design a message queue and event-driven architecture?
TL;DR
- Bottom line: Choose Kafka for high-throughput event streaming with replay, RabbitMQ for complex routing with traditional queuing, AWS SQS/SNS for serverless fan-out with zero ops, or NATS for ultra-low-latency lightweight messaging.
- Key pattern: Publish events to topics/exchanges; consumers pull asynchronously with independent consumer groups — decouple producers from consumers in both time and space.
- Watch out for: Treating message brokers as databases — they are transit infrastructure, not long-term storage (except Kafka with log compaction).
- Works with: Any language/platform; all major brokers have clients for Python, Java, Go, Node.js, .NET, and Rust.
Constraints
- No broker delivers true end-to-end exactly-once across network boundaries — Kafka's exactly-once is transactional within its ecosystem only
- Message ordering is guaranteed only within a single partition (Kafka) or single queue (RabbitMQ/SQS FIFO) — cross-partition ordering requires application-level logic
- Replication factor must never exceed available broker count — higher replication improves durability but linearly increases write latency
- Always configure dead letter queues (DLQs) — without them, poison messages block entire partitions or queues indefinitely
- Enforce schema validation (Avro/Protobuf/JSON Schema) from day one — retrofitting schemas in production is extremely costly
Quick Reference
| Component | Role | Technology Options | Scaling Strategy |
|---|---|---|---|
| Message Broker | Core message transit and buffering | Kafka, RabbitMQ, NATS, Pulsar | Horizontal partitioning (Kafka), clustering (RabbitMQ), superclusters (NATS) |
| Event Router | Content-based routing and filtering | AWS EventBridge, RabbitMQ exchanges, Kafka Streams | Filter rules at router level to reduce downstream load |
| Schema Registry | Enforce message contracts | Confluent Schema Registry, AWS Glue, Apicurio | Central service, replicated |
| Producer | Publish events/messages | Kafka Producer API, amqplib, AWS SDK, NATS client | Batch writes, async sends, connection pooling |
| Consumer Group | Parallel consumption with offset tracking | Kafka Consumer Groups, RabbitMQ competing consumers | Add consumers up to partition/queue count |
| Dead Letter Queue | Capture failed messages | Kafka DLQ topic, RabbitMQ DLX, SQS DLQ | Monitor and reprocess; alert on growth |
| Stream Processor | Transform, aggregate, join event streams | Kafka Streams, Flink, Spark Streaming | Horizontal scaling by partition count |
| Fan-out Layer | Broadcast one event to multiple consumers | SNS, RabbitMQ fanout exchange, Kafka multi-consumer-group | One topic, N independent subscriber groups |
| Idempotency Store | Deduplicate at-least-once deliveries | Redis, PostgreSQL, DynamoDB | Keyed by message ID + consumer ID |
| Observability | Monitor lag, throughput, error rates | Prometheus + Grafana, Datadog, CloudWatch | Alert on consumer lag > threshold |
| CDC Pipeline | Capture database changes as events | Debezium, AWS DMS, Maxwell | One connector per source table/database |
| API Gateway | Ingest external events into broker | Kong, AWS API Gateway, Envoy | Rate limit + authenticate before enqueue |
Decision Tree
START
+-- Need message replay / event sourcing / audit log?
| +-- YES --> Kafka (log-based, configurable retention, compaction)
| +-- NO |
+-- Need complex routing (topic, headers, priority queues)?
| +-- YES --> RabbitMQ (exchanges: direct, topic, fanout, headers)
| +-- NO |
+-- Want zero operational overhead (fully managed)?
| +-- YES --> AWS SQS/SNS (or Google Pub/Sub, Azure Service Bus)
| +-- NO |
+-- Need sub-millisecond latency with minimal footprint?
| +-- YES --> NATS (with JetStream for persistence)
| +-- NO |
+-- Need exactly-once within streaming pipeline?
| +-- YES --> Kafka with transactions (enable.idempotence + transactional.id)
| +-- NO |
+-- Throughput > 500K msg/s?
| +-- YES --> Kafka or NATS JetStream
| +-- NO --> RabbitMQ or SQS (sufficient for most applications)
Step-by-Step Guide
1. Define message contracts and schemas
Establish schemas before writing any producer or consumer code. Use Avro or Protobuf for binary efficiency, or JSON Schema for human readability. Register schemas in a schema registry to enforce backward/forward compatibility. [src1]
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["event_type", "event_id", "timestamp", "payload"],
"properties": {
"event_type": { "type": "string" },
"event_id": { "type": "string", "format": "uuid" },
"timestamp": { "type": "string", "format": "date-time" },
"version": { "type": "integer", "minimum": 1 },
"payload": { "type": "object" }
}
}
Verify: Register schema and attempt to produce a message with a missing required field — the registry should reject it.
2. Design topic/queue topology
Map business domains to topics (Kafka) or exchanges+queues (RabbitMQ). Use one topic per event type. Avoid god-topics that mix unrelated events. [src5]
# Kafka topic naming convention: {domain}.{entity}.{event}
orders.order.created
orders.order.cancelled
payments.payment.completed
inventory.stock.updated
Verify: List topics/queues and confirm each maps to exactly one business event type.
3. Configure partitioning for parallelism
Partition count determines maximum consumer parallelism. Start with max(expected_consumers, expected_throughput / per_partition_throughput). For Kafka, a single partition handles ~10 MB/s writes. [src1]
# Kafka: create topic with 12 partitions, replication factor 3
kafka-topics.sh --create \
--topic orders.order.created \
--partitions 12 \
--replication-factor 3 \
--bootstrap-server localhost:9092
Verify: kafka-topics.sh --describe --topic orders.order.created → shows 12 partitions, 3 replicas each.
4. Implement producers with idempotency
Enable idempotent production to avoid duplicates on retries. Batch messages for throughput, but bound batch size and linger time for latency. [src1]
# Kafka producer with idempotency (Python, confluent-kafka 2.6+)
from confluent_kafka import Producer
import json, uuid
producer = Producer({
'bootstrap.servers': 'kafka-broker:9092',
'enable.idempotence': True,
'acks': 'all',
'linger.ms': 10,
'batch.size': 65536,
'compression.type': 'lz4',
})
def publish_order_event(order):
event = {
'event_type': 'order.created',
'event_id': str(uuid.uuid4()),
'timestamp': '2026-02-23T10:00:00Z',
'version': 1,
'payload': order,
}
producer.produce(
topic='orders.order.created',
key=str(order['order_id']),
value=json.dumps(event).encode(),
callback=lambda err, msg: print(f'Error: {err}') if err else None,
)
producer.flush()
Verify: Produce 100 messages, consume all — count should be exactly 100, no duplicates.
5. Implement consumers with offset management
Use consumer groups for parallel processing. Commit offsets only after successful processing. Implement idempotent handlers for effectively-once semantics. [src1]
# Kafka consumer with manual offset commit (Python, confluent-kafka 2.6+)
from confluent_kafka import Consumer
import json
consumer = Consumer({
'bootstrap.servers': 'kafka-broker:9092',
'group.id': 'order-processor',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.interval.ms': 300000,
})
consumer.subscribe(['orders.order.created'])
processed_ids = set() # Use Redis/DB in production
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f'Consumer error: {msg.error()}')
continue
event = json.loads(msg.value().decode())
event_id = event['event_id']
if event_id not in processed_ids:
process_order(event['payload'])
processed_ids.add(event_id)
consumer.commit(message=msg)
finally:
consumer.close()
Verify: Stop consumer mid-batch, restart — it should reprocess from last committed offset.
6. Set up dead letter queues and monitoring
Configure DLQs to capture messages that fail processing after N retries. Monitor consumer lag, DLQ depth, and error rates. [src2] [src3]
# DLQ pattern: retry N times, then route to dead letter topic
MAX_RETRIES = 3
def process_with_dlq(msg, consumer, dlq_producer):
retries = int(msg.headers().get('retry_count', 0)) if msg.headers() else 0
try:
event = json.loads(msg.value().decode())
process_order(event['payload'])
consumer.commit(message=msg)
except Exception as e:
if retries >= MAX_RETRIES:
dlq_producer.produce(
topic='orders.order.created.dlq',
key=msg.key(), value=msg.value(),
headers={'error': str(e), 'original_topic': msg.topic()},
)
consumer.commit(message=msg)
else:
dlq_producer.produce(
topic=msg.topic(), key=msg.key(), value=msg.value(),
headers={'retry_count': str(retries + 1)},
)
consumer.commit(message=msg)
Verify: Send a message that always fails — after 3 retries it should appear in the .dlq topic.
Code Examples
Python (confluent-kafka): Kafka Producer/Consumer
# Input: Kafka broker at localhost:9092, topic 'events'
# Output: Produces and consumes JSON events with idempotency
from confluent_kafka import Producer, Consumer
import json, uuid
# Producer
p = Producer({'bootstrap.servers': 'localhost:9092', 'enable.idempotence': True, 'acks': 'all'})
p.produce('events', key='user-1', value=json.dumps({'id': str(uuid.uuid4()), 'action': 'signup'}).encode())
p.flush()
# Consumer
c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'svc', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False})
c.subscribe(['events'])
msg = c.poll(5.0)
if msg and not msg.error():
print(json.loads(msg.value()))
c.commit(message=msg)
c.close()
Node.js (amqplib): RabbitMQ Publish/Subscribe
// Input: RabbitMQ at amqp://localhost, exchange 'events', queue 'order-processor'
// Output: Publishes event to fanout exchange, consumes from bound queue
const amqp = require('amqplib'); // [email protected]
async function main() {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
await ch.assertExchange('events', 'fanout', { durable: true });
ch.publish('events', '', Buffer.from(JSON.stringify({ type: 'order.created', id: '123' })),
{ persistent: true, contentType: 'application/json' });
const q = await ch.assertQueue('order-processor', { durable: true });
await ch.bindQueue(q.queue, 'events', '');
await ch.prefetch(10);
ch.consume(q.queue, (msg) => {
if (msg) { console.log(JSON.parse(msg.content.toString())); ch.ack(msg); }
});
}
main().catch(console.error);
Python (boto3): AWS SQS Send/Receive
# Input: AWS SQS queue URL, AWS credentials configured
# Output: Sends and receives a message with deduplication
import boto3, json, uuid # boto3>=1.35.0
sqs = boto3.client('sqs', region_name='us-east-1')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/orders'
# Send (FIFO queue)
sqs.send_message(QueueUrl=queue_url, MessageBody=json.dumps({'order_id': '456', 'action': 'created'}),
MessageGroupId='orders', MessageDeduplicationId=str(uuid.uuid4()))
# Receive + Delete
resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=20)
for msg in resp.get('Messages', []):
print(json.loads(msg['Body']))
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=msg['ReceiptHandle'])
Anti-Patterns
Wrong: Using message broker as a database
# BAD -- scanning entire Kafka topic to find one record -- O(n) per lookup
def get_user_profile(user_id):
consumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': f'lookup-{user_id}'})
consumer.subscribe(['user.profiles'])
for msg in consumer:
if msg.key() == user_id:
return json.loads(msg.value())
Correct: Materialize state into a database, use broker for transit
# GOOD -- consume events into a database, query the database for reads
def handle_user_event(event):
db.execute("INSERT INTO users (id, name, email) VALUES (%s, %s, %s) "
"ON CONFLICT (id) DO UPDATE SET name=%s, email=%s",
(event['user_id'], event['name'], event['email'],
event['name'], event['email']))
def get_user_profile(user_id):
return db.execute("SELECT * FROM users WHERE id = %s", (user_id,))
Wrong: Publishing commands disguised as events
# BAD -- event name is imperative (a command), creating tight coupling
producer.produce('orders', value=json.dumps({
'event_type': 'SendEmailToCustomer', # This is a command, not an event
'email': '[email protected]',
'template': 'order_confirmation',
}))
Correct: Publish facts about what happened
# GOOD -- event describes what happened; consumers react independently
producer.produce('orders', value=json.dumps({
'event_type': 'order.confirmed', # Past tense = event (a fact)
'order_id': 'ORD-789',
'customer_id': 'CUST-123',
'total': 149.99,
'confirmed_at': '2026-02-23T10:30:00Z',
}))
Wrong: No schema validation
// BAD -- no contract between producer and consumer
channel.publish('events', '', Buffer.from(JSON.stringify({
typ: 'order', // Typo -- no validation catches this
data: { amt: '19.99' }, // String instead of number -- discovered in production
})));
Correct: Enforce schemas with a registry
// GOOD -- schema registry validates every message
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');
const registry = new SchemaRegistry({ host: 'http://schema-registry:8081' });
const schemaId = await registry.getLatestSchemaId('order-value');
const encoded = await registry.encode(schemaId, {
event_type: 'order.created',
event_id: '550e8400-e29b-41d4-a716-446655440000',
timestamp: '2026-02-23T10:00:00Z',
version: 1,
payload: { order_id: 'ORD-789', total: 149.99 },
});
await producer.send({ topic: 'orders', messages: [{ value: encoded }] });
Wrong: Auto-committing offsets before processing
# BAD -- auto-commit advances offset before processing finishes
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'svc',
'enable.auto.commit': True, # Dangerous: commits before processing
'auto.commit.interval.ms': 1000,
})
Correct: Manual commit after successful processing
# GOOD -- commit only after business logic completes successfully
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'svc',
'enable.auto.commit': False, # Manual commit only
})
# ... poll, process, then:
consumer.commit(message=msg) # Commit after processing succeeds
Common Pitfalls
- Consumer lag spiral: Consumers fall behind due to slow processing, causing increasing memory pressure. Fix: monitor
consumer_lagmetric per partition, scale consumers to match partition count. [src1] - Poison message blocking: A single malformed message crashes the consumer repeatedly, blocking the entire partition. Fix: wrap processing in try/catch, route failures to DLQ after N retries. [src2]
- Thundering herd on rebalance: All consumers restart simultaneously, triggering cascading rebalances. Fix: use
cooperative-stickyassignor, stagger pod restarts withmaxUnavailable: 1. [src1] - Unbounded retry loops: Failed messages re-enqueued infinitely without backoff, amplifying load during outages. Fix: exponential backoff
delay = min(base * 2^attempt, max_delay), cap retries, then DLQ. [src6] - Event explosion from CRUD mapping: Publishing separate events for every DB column change creates excessive volume. Fix: publish coarse-grained domain events (
order.completed) not CRUD operations. [src7] - Missing idempotency in consumers: At-least-once delivery means duplicate processing without idempotent handlers. Fix: store processed
event_idin an idempotency store and check before processing. [src5] - Single-partition bottleneck: Putting all messages on one partition for global ordering hits throughput limits. Fix: partition by entity ID (order_id, user_id) for per-entity ordering. [src1]
- Ignoring backpressure: Producers overwhelm a broker that cannot keep up. Fix: set
max.block.ms(Kafka) or publisher confirms (RabbitMQ) so producers block or fail fast. [src2]
Diagnostic Commands
# Check Kafka consumer group lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-processor
# Check Kafka topic partitions and replicas
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic orders.order.created
# Check RabbitMQ queue depth and consumer count
rabbitmqctl list_queues name messages consumers
# Check RabbitMQ cluster status
rabbitmqctl cluster_status
# Check AWS SQS queue attributes
aws sqs get-queue-attributes --queue-url $QUEUE_URL \
--attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible
# Monitor NATS JetStream stream info
nats stream info ORDERS
# Check NATS consumer pending count
nats consumer info ORDERS order-processor
Version History & Compatibility
| Version | Status | Breaking Changes | Migration Notes |
|---|---|---|---|
| Kafka 3.7+ (2024) | Current | KRaft only — ZooKeeper removed | Migrate from ZooKeeper to KRaft before upgrading |
| Kafka 3.3-3.6 | Stable | KRaft GA, ZooKeeper deprecated | Start KRaft migration; both modes supported |
| Kafka 2.x | Legacy | — | Upgrade to 3.x for KRaft, tiered storage |
| RabbitMQ 4.0+ (2024) | Current | Classic queues deprecated | Migrate all classic queues to quorum queues |
| RabbitMQ 3.12-3.13 | LTS | Quorum queues default | — |
| NATS 2.10+ (2024) | Current | JetStream R2 API changes | Update client libraries to latest |
| SQS/SNS | Managed | N/A | AWS manages versioning; FIFO queues added 2016 |
When to Use / When Not to Use
| Use When | Don't Use When | Use Instead |
|---|---|---|
| Services need temporal decoupling (producer and consumer run at different times/speeds) | Request needs synchronous response within 100ms | REST/gRPC with circuit breaker |
| One event triggers actions in multiple independent services (fan-out) | Only two services communicate and both are always available | Direct service-to-service call (simpler) |
| Need audit log / event replay / event sourcing | Data is ephemeral and loss is acceptable | In-memory pub/sub (Redis Pub/Sub, EventEmitter) |
| Throughput exceeds what synchronous calls can handle (>10K req/s) | Low message volume (<100 msg/day) with simple routing | Cron job + database polling |
| Need to buffer during downstream outages | All consumers must process in real-time with no buffering | WebSocket / SSE for real-time push |
| Regulatory requirement for message durability and ordering | Prototyping or MVP where operational overhead must be zero | Firebase, Supabase Realtime, or managed webhooks |
Important Caveats
- Kafka KRaft mode is now required for Kafka 3.7+ — ZooKeeper support has been fully removed; all new deployments must use KRaft
- RabbitMQ 4.0 deprecated classic queues — quorum queues are now the default; classic mirrored queues will be removed in a future release
- AWS SQS standard queues provide at-least-once delivery with best-effort ordering — use FIFO queues for strict ordering, but throughput is capped at 3,000 msg/s per queue (with batching) vs unlimited for standard
- NATS JetStream persistence is production-ready but the ecosystem (tooling, client libraries, monitoring) is less mature than Kafka's — evaluate library support for your language before committing
- Message broker selection is a long-term architectural decision — migrating between brokers is a major project involving producer/consumer rewrites, operational tooling changes, and data migration planning
- Cloud-managed offerings (Amazon MSK, CloudAMQP, Confluent Cloud) reduce operational burden but introduce vendor lock-in and can be 3-10x more expensive than self-managed at scale