| Component | Role | Technology Options | Scaling Strategy |
|---|---|---|---|
| Message Broker | Core message transit and buffering | Kafka, RabbitMQ, NATS, Pulsar | Horizontal partitioning (Kafka), clustering (RabbitMQ), superclusters (NATS) |
| Event Router | Content-based routing and filtering | AWS EventBridge, RabbitMQ exchanges, Kafka Streams | Filter rules at router level to reduce downstream load |
| Schema Registry | Enforce message contracts | Confluent Schema Registry, AWS Glue, Apicurio | Central service, replicated |
| Producer | Publish events/messages | Kafka Producer API, amqplib, AWS SDK, NATS client | Batch writes, async sends, connection pooling |
| Consumer Group | Parallel consumption with offset tracking | Kafka Consumer Groups, RabbitMQ competing consumers | Add consumers up to partition/queue count |
| Dead Letter Queue | Capture failed messages | Kafka DLQ topic, RabbitMQ DLX, SQS DLQ | Monitor and reprocess; alert on growth |
| Stream Processor | Transform, aggregate, join event streams | Kafka Streams, Flink, Spark Streaming | Horizontal scaling by partition count |
| Fan-out Layer | Broadcast one event to multiple consumers | SNS, RabbitMQ fanout exchange, Kafka multi-consumer-group | One topic, N independent subscriber groups |
| Idempotency Store | Deduplicate at-least-once deliveries | Redis, PostgreSQL, DynamoDB | Keyed by message ID + consumer ID |
| Observability | Monitor lag, throughput, error rates | Prometheus + Grafana, Datadog, CloudWatch | Alert on consumer lag > threshold |
| CDC Pipeline | Capture database changes as events | Debezium, AWS DMS, Maxwell | One connector per source table/database |
| API Gateway | Ingest external events into broker | Kong, AWS API Gateway, Envoy | Rate limit + authenticate before enqueue |
START
+-- Need message replay / event sourcing / audit log?
| +-- YES --> Kafka (log-based, configurable retention, compaction)
| +-- NO |
+-- Need complex routing (topic, headers, priority queues)?
| +-- YES --> RabbitMQ (exchanges: direct, topic, fanout, headers)
| +-- NO |
+-- Want zero operational overhead (fully managed)?
| +-- YES --> AWS SQS/SNS (or Google Pub/Sub, Azure Service Bus)
| +-- NO |
+-- Need sub-millisecond latency with minimal footprint?
| +-- YES --> NATS (with JetStream for persistence)
| +-- NO |
+-- Need exactly-once within streaming pipeline?
| +-- YES --> Kafka with transactions (enable.idempotence + transactional.id)
| +-- NO |
+-- Throughput > 500K msg/s?
| +-- YES --> Kafka or NATS JetStream
| +-- NO --> RabbitMQ or SQS (sufficient for most applications)
Establish schemas before writing any producer or consumer code. Use Avro or Protobuf for binary efficiency, or JSON Schema for human readability. Register schemas in a schema registry to enforce backward/forward compatibility. [src1]
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["event_type", "event_id", "timestamp", "payload"],
"properties": {
"event_type": { "type": "string" },
"event_id": { "type": "string", "format": "uuid" },
"timestamp": { "type": "string", "format": "date-time" },
"version": { "type": "integer", "minimum": 1 },
"payload": { "type": "object" }
}
}
Verify: Register schema and attempt to produce a message with a missing required field — the registry should reject it.
Map business domains to topics (Kafka) or exchanges+queues (RabbitMQ). Use one topic per event type. Avoid god-topics that mix unrelated events. [src5]
# Kafka topic naming convention: {domain}.{entity}.{event}
orders.order.created
orders.order.cancelled
payments.payment.completed
inventory.stock.updated
Verify: List topics/queues and confirm each maps to exactly one business event type.
Partition count determines maximum consumer parallelism. Start with max(expected_consumers, expected_throughput / per_partition_throughput). For Kafka, a single partition handles ~10 MB/s writes. [src1]
# Kafka: create topic with 12 partitions, replication factor 3
kafka-topics.sh --create \
--topic orders.order.created \
--partitions 12 \
--replication-factor 3 \
--bootstrap-server localhost:9092
Verify: kafka-topics.sh --describe --topic orders.order.created → shows 12 partitions, 3 replicas each.
Enable idempotent production to avoid duplicates on retries. Batch messages for throughput, but bound batch size and linger time for latency. [src1]
# Kafka producer with idempotency (Python, confluent-kafka 2.6+)
from confluent_kafka import Producer
import json, uuid
producer = Producer({
'bootstrap.servers': 'kafka-broker:9092',
'enable.idempotence': True,
'acks': 'all',
'linger.ms': 10,
'batch.size': 65536,
'compression.type': 'lz4',
})
def publish_order_event(order):
event = {
'event_type': 'order.created',
'event_id': str(uuid.uuid4()),
'timestamp': '2026-02-23T10:00:00Z',
'version': 1,
'payload': order,
}
producer.produce(
topic='orders.order.created',
key=str(order['order_id']),
value=json.dumps(event).encode(),
callback=lambda err, msg: print(f'Error: {err}') if err else None,
)
producer.flush()
Verify: Produce 100 messages, consume all — count should be exactly 100, no duplicates.
Use consumer groups for parallel processing. Commit offsets only after successful processing. Implement idempotent handlers for effectively-once semantics. [src1]
# Kafka consumer with manual offset commit (Python, confluent-kafka 2.6+)
from confluent_kafka import Consumer
import json
consumer = Consumer({
'bootstrap.servers': 'kafka-broker:9092',
'group.id': 'order-processor',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.interval.ms': 300000,
})
consumer.subscribe(['orders.order.created'])
processed_ids = set() # Use Redis/DB in production
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f'Consumer error: {msg.error()}')
continue
event = json.loads(msg.value().decode())
event_id = event['event_id']
if event_id not in processed_ids:
process_order(event['payload'])
processed_ids.add(event_id)
consumer.commit(message=msg)
finally:
consumer.close()
Verify: Stop consumer mid-batch, restart — it should reprocess from last committed offset.
Configure DLQs to capture messages that fail processing after N retries. Monitor consumer lag, DLQ depth, and error rates. [src2] [src3]
# DLQ pattern: retry N times, then route to dead letter topic
MAX_RETRIES = 3
def process_with_dlq(msg, consumer, dlq_producer):
retries = int(msg.headers().get('retry_count', 0)) if msg.headers() else 0
try:
event = json.loads(msg.value().decode())
process_order(event['payload'])
consumer.commit(message=msg)
except Exception as e:
if retries >= MAX_RETRIES:
dlq_producer.produce(
topic='orders.order.created.dlq',
key=msg.key(), value=msg.value(),
headers={'error': str(e), 'original_topic': msg.topic()},
)
consumer.commit(message=msg)
else:
dlq_producer.produce(
topic=msg.topic(), key=msg.key(), value=msg.value(),
headers={'retry_count': str(retries + 1)},
)
consumer.commit(message=msg)
Verify: Send a message that always fails — after 3 retries it should appear in the .dlq topic.
# Input: Kafka broker at localhost:9092, topic 'events'
# Output: Produces and consumes JSON events with idempotency
from confluent_kafka import Producer, Consumer
import json, uuid
# Producer
p = Producer({'bootstrap.servers': 'localhost:9092', 'enable.idempotence': True, 'acks': 'all'})
p.produce('events', key='user-1', value=json.dumps({'id': str(uuid.uuid4()), 'action': 'signup'}).encode())
p.flush()
# Consumer
c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'svc', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False})
c.subscribe(['events'])
msg = c.poll(5.0)
if msg and not msg.error():
print(json.loads(msg.value()))
c.commit(message=msg)
c.close()
// Input: RabbitMQ at amqp://localhost, exchange 'events', queue 'order-processor'
// Output: Publishes event to fanout exchange, consumes from bound queue
const amqp = require('amqplib'); // [email protected]
async function main() {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
await ch.assertExchange('events', 'fanout', { durable: true });
ch.publish('events', '', Buffer.from(JSON.stringify({ type: 'order.created', id: '123' })),
{ persistent: true, contentType: 'application/json' });
const q = await ch.assertQueue('order-processor', { durable: true });
await ch.bindQueue(q.queue, 'events', '');
await ch.prefetch(10);
ch.consume(q.queue, (msg) => {
if (msg) { console.log(JSON.parse(msg.content.toString())); ch.ack(msg); }
});
}
main().catch(console.error);
# Input: AWS SQS queue URL, AWS credentials configured
# Output: Sends and receives a message with deduplication
import boto3, json, uuid # boto3>=1.35.0
sqs = boto3.client('sqs', region_name='us-east-1')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/orders'
# Send (FIFO queue)
sqs.send_message(QueueUrl=queue_url, MessageBody=json.dumps({'order_id': '456', 'action': 'created'}),
MessageGroupId='orders', MessageDeduplicationId=str(uuid.uuid4()))
# Receive + Delete
resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=20)
for msg in resp.get('Messages', []):
print(json.loads(msg['Body']))
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=msg['ReceiptHandle'])
# BAD -- scanning entire Kafka topic to find one record -- O(n) per lookup
def get_user_profile(user_id):
consumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': f'lookup-{user_id}'})
consumer.subscribe(['user.profiles'])
for msg in consumer:
if msg.key() == user_id:
return json.loads(msg.value())
# GOOD -- consume events into a database, query the database for reads
def handle_user_event(event):
db.execute("INSERT INTO users (id, name, email) VALUES (%s, %s, %s) "
"ON CONFLICT (id) DO UPDATE SET name=%s, email=%s",
(event['user_id'], event['name'], event['email'],
event['name'], event['email']))
def get_user_profile(user_id):
return db.execute("SELECT * FROM users WHERE id = %s", (user_id,))
# BAD -- event name is imperative (a command), creating tight coupling
producer.produce('orders', value=json.dumps({
'event_type': 'SendEmailToCustomer', # This is a command, not an event
'email': '[email protected]',
'template': 'order_confirmation',
}))
# GOOD -- event describes what happened; consumers react independently
producer.produce('orders', value=json.dumps({
'event_type': 'order.confirmed', # Past tense = event (a fact)
'order_id': 'ORD-789',
'customer_id': 'CUST-123',
'total': 149.99,
'confirmed_at': '2026-02-23T10:30:00Z',
}))
// BAD -- no contract between producer and consumer
channel.publish('events', '', Buffer.from(JSON.stringify({
typ: 'order', // Typo -- no validation catches this
data: { amt: '19.99' }, // String instead of number -- discovered in production
})));
// GOOD -- schema registry validates every message
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');
const registry = new SchemaRegistry({ host: 'http://schema-registry:8081' });
const schemaId = await registry.getLatestSchemaId('order-value');
const encoded = await registry.encode(schemaId, {
event_type: 'order.created',
event_id: '550e8400-e29b-41d4-a716-446655440000',
timestamp: '2026-02-23T10:00:00Z',
version: 1,
payload: { order_id: 'ORD-789', total: 149.99 },
});
await producer.send({ topic: 'orders', messages: [{ value: encoded }] });
# BAD -- auto-commit advances offset before processing finishes
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'svc',
'enable.auto.commit': True, # Dangerous: commits before processing
'auto.commit.interval.ms': 1000,
})
# GOOD -- commit only after business logic completes successfully
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'svc',
'enable.auto.commit': False, # Manual commit only
})
# ... poll, process, then:
consumer.commit(message=msg) # Commit after processing succeeds
consumer_lag metric per partition, scale consumers to match partition count. [src1]cooperative-sticky assignor, stagger pod restarts with maxUnavailable: 1. [src1]delay = min(base * 2^attempt, max_delay), cap retries, then DLQ. [src6]order.completed) not CRUD operations. [src7]event_id in an idempotency store and check before processing. [src5]max.block.ms (Kafka) or publisher confirms (RabbitMQ) so producers block or fail fast. [src2]# Check Kafka consumer group lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-processor
# Check Kafka topic partitions and replicas
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic orders.order.created
# Check RabbitMQ queue depth and consumer count
rabbitmqctl list_queues name messages consumers
# Check RabbitMQ cluster status
rabbitmqctl cluster_status
# Check AWS SQS queue attributes
aws sqs get-queue-attributes --queue-url $QUEUE_URL \
--attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible
# Monitor NATS JetStream stream info
nats stream info ORDERS
# Check NATS consumer pending count
nats consumer info ORDERS order-processor
| Version | Status | Breaking Changes | Migration Notes |
|---|---|---|---|
| Kafka 3.7+ (2024) | Current | KRaft only — ZooKeeper removed | Migrate from ZooKeeper to KRaft before upgrading |
| Kafka 3.3-3.6 | Stable | KRaft GA, ZooKeeper deprecated | Start KRaft migration; both modes supported |
| Kafka 2.x | Legacy | — | Upgrade to 3.x for KRaft, tiered storage |
| RabbitMQ 4.0+ (2024) | Current | Classic queues deprecated | Migrate all classic queues to quorum queues |
| RabbitMQ 3.12-3.13 | LTS | Quorum queues default | — |
| NATS 2.10+ (2024) | Current | JetStream R2 API changes | Update client libraries to latest |
| SQS/SNS | Managed | N/A | AWS manages versioning; FIFO queues added 2016 |
| Use When | Don't Use When | Use Instead |
|---|---|---|
| Services need temporal decoupling (producer and consumer run at different times/speeds) | Request needs synchronous response within 100ms | REST/gRPC with circuit breaker |
| One event triggers actions in multiple independent services (fan-out) | Only two services communicate and both are always available | Direct service-to-service call (simpler) |
| Need audit log / event replay / event sourcing | Data is ephemeral and loss is acceptable | In-memory pub/sub (Redis Pub/Sub, EventEmitter) |
| Throughput exceeds what synchronous calls can handle (>10K req/s) | Low message volume (<100 msg/day) with simple routing | Cron job + database polling |
| Need to buffer during downstream outages | All consumers must process in real-time with no buffering | WebSocket / SSE for real-time push |
| Regulatory requirement for message durability and ordering | Prototyping or MVP where operational overhead must be zero | Firebase, Supabase Realtime, or managed webhooks |