Message Queue & Event-Driven Architecture Design

Type: Software Reference Confidence: 0.93 Sources: 7 Verified: 2026-02-23 Freshness: stable

TL;DR

Constraints

Quick Reference

ComponentRoleTechnology OptionsScaling Strategy
Message BrokerCore message transit and bufferingKafka, RabbitMQ, NATS, PulsarHorizontal partitioning (Kafka), clustering (RabbitMQ), superclusters (NATS)
Event RouterContent-based routing and filteringAWS EventBridge, RabbitMQ exchanges, Kafka StreamsFilter rules at router level to reduce downstream load
Schema RegistryEnforce message contractsConfluent Schema Registry, AWS Glue, ApicurioCentral service, replicated
ProducerPublish events/messagesKafka Producer API, amqplib, AWS SDK, NATS clientBatch writes, async sends, connection pooling
Consumer GroupParallel consumption with offset trackingKafka Consumer Groups, RabbitMQ competing consumersAdd consumers up to partition/queue count
Dead Letter QueueCapture failed messagesKafka DLQ topic, RabbitMQ DLX, SQS DLQMonitor and reprocess; alert on growth
Stream ProcessorTransform, aggregate, join event streamsKafka Streams, Flink, Spark StreamingHorizontal scaling by partition count
Fan-out LayerBroadcast one event to multiple consumersSNS, RabbitMQ fanout exchange, Kafka multi-consumer-groupOne topic, N independent subscriber groups
Idempotency StoreDeduplicate at-least-once deliveriesRedis, PostgreSQL, DynamoDBKeyed by message ID + consumer ID
ObservabilityMonitor lag, throughput, error ratesPrometheus + Grafana, Datadog, CloudWatchAlert on consumer lag > threshold
CDC PipelineCapture database changes as eventsDebezium, AWS DMS, MaxwellOne connector per source table/database
API GatewayIngest external events into brokerKong, AWS API Gateway, EnvoyRate limit + authenticate before enqueue

Decision Tree

START
+-- Need message replay / event sourcing / audit log?
|   +-- YES --> Kafka (log-based, configurable retention, compaction)
|   +-- NO |
+-- Need complex routing (topic, headers, priority queues)?
|   +-- YES --> RabbitMQ (exchanges: direct, topic, fanout, headers)
|   +-- NO |
+-- Want zero operational overhead (fully managed)?
|   +-- YES --> AWS SQS/SNS (or Google Pub/Sub, Azure Service Bus)
|   +-- NO |
+-- Need sub-millisecond latency with minimal footprint?
|   +-- YES --> NATS (with JetStream for persistence)
|   +-- NO |
+-- Need exactly-once within streaming pipeline?
|   +-- YES --> Kafka with transactions (enable.idempotence + transactional.id)
|   +-- NO |
+-- Throughput > 500K msg/s?
|   +-- YES --> Kafka or NATS JetStream
|   +-- NO --> RabbitMQ or SQS (sufficient for most applications)

Step-by-Step Guide

1. Define message contracts and schemas

Establish schemas before writing any producer or consumer code. Use Avro or Protobuf for binary efficiency, or JSON Schema for human readability. Register schemas in a schema registry to enforce backward/forward compatibility. [src1]

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "object",
  "required": ["event_type", "event_id", "timestamp", "payload"],
  "properties": {
    "event_type": { "type": "string" },
    "event_id": { "type": "string", "format": "uuid" },
    "timestamp": { "type": "string", "format": "date-time" },
    "version": { "type": "integer", "minimum": 1 },
    "payload": { "type": "object" }
  }
}

Verify: Register schema and attempt to produce a message with a missing required field — the registry should reject it.

2. Design topic/queue topology

Map business domains to topics (Kafka) or exchanges+queues (RabbitMQ). Use one topic per event type. Avoid god-topics that mix unrelated events. [src5]

# Kafka topic naming convention: {domain}.{entity}.{event}
orders.order.created
orders.order.cancelled
payments.payment.completed
inventory.stock.updated

Verify: List topics/queues and confirm each maps to exactly one business event type.

3. Configure partitioning for parallelism

Partition count determines maximum consumer parallelism. Start with max(expected_consumers, expected_throughput / per_partition_throughput). For Kafka, a single partition handles ~10 MB/s writes. [src1]

# Kafka: create topic with 12 partitions, replication factor 3
kafka-topics.sh --create \
  --topic orders.order.created \
  --partitions 12 \
  --replication-factor 3 \
  --bootstrap-server localhost:9092

Verify: kafka-topics.sh --describe --topic orders.order.created → shows 12 partitions, 3 replicas each.

4. Implement producers with idempotency

Enable idempotent production to avoid duplicates on retries. Batch messages for throughput, but bound batch size and linger time for latency. [src1]

# Kafka producer with idempotency (Python, confluent-kafka 2.6+)
from confluent_kafka import Producer
import json, uuid

producer = Producer({
    'bootstrap.servers': 'kafka-broker:9092',
    'enable.idempotence': True,
    'acks': 'all',
    'linger.ms': 10,
    'batch.size': 65536,
    'compression.type': 'lz4',
})

def publish_order_event(order):
    event = {
        'event_type': 'order.created',
        'event_id': str(uuid.uuid4()),
        'timestamp': '2026-02-23T10:00:00Z',
        'version': 1,
        'payload': order,
    }
    producer.produce(
        topic='orders.order.created',
        key=str(order['order_id']),
        value=json.dumps(event).encode(),
        callback=lambda err, msg: print(f'Error: {err}') if err else None,
    )
    producer.flush()

Verify: Produce 100 messages, consume all — count should be exactly 100, no duplicates.

5. Implement consumers with offset management

Use consumer groups for parallel processing. Commit offsets only after successful processing. Implement idempotent handlers for effectively-once semantics. [src1]

# Kafka consumer with manual offset commit (Python, confluent-kafka 2.6+)
from confluent_kafka import Consumer
import json

consumer = Consumer({
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    'max.poll.interval.ms': 300000,
})
consumer.subscribe(['orders.order.created'])

processed_ids = set()  # Use Redis/DB in production

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Consumer error: {msg.error()}')
            continue
        event = json.loads(msg.value().decode())
        event_id = event['event_id']
        if event_id not in processed_ids:
            process_order(event['payload'])
            processed_ids.add(event_id)
        consumer.commit(message=msg)
finally:
    consumer.close()

Verify: Stop consumer mid-batch, restart — it should reprocess from last committed offset.

6. Set up dead letter queues and monitoring

Configure DLQs to capture messages that fail processing after N retries. Monitor consumer lag, DLQ depth, and error rates. [src2] [src3]

# DLQ pattern: retry N times, then route to dead letter topic
MAX_RETRIES = 3

def process_with_dlq(msg, consumer, dlq_producer):
    retries = int(msg.headers().get('retry_count', 0)) if msg.headers() else 0
    try:
        event = json.loads(msg.value().decode())
        process_order(event['payload'])
        consumer.commit(message=msg)
    except Exception as e:
        if retries >= MAX_RETRIES:
            dlq_producer.produce(
                topic='orders.order.created.dlq',
                key=msg.key(), value=msg.value(),
                headers={'error': str(e), 'original_topic': msg.topic()},
            )
            consumer.commit(message=msg)
        else:
            dlq_producer.produce(
                topic=msg.topic(), key=msg.key(), value=msg.value(),
                headers={'retry_count': str(retries + 1)},
            )
            consumer.commit(message=msg)

Verify: Send a message that always fails — after 3 retries it should appear in the .dlq topic.

Code Examples

Python (confluent-kafka): Kafka Producer/Consumer

# Input:  Kafka broker at localhost:9092, topic 'events'
# Output: Produces and consumes JSON events with idempotency

from confluent_kafka import Producer, Consumer
import json, uuid

# Producer
p = Producer({'bootstrap.servers': 'localhost:9092', 'enable.idempotence': True, 'acks': 'all'})
p.produce('events', key='user-1', value=json.dumps({'id': str(uuid.uuid4()), 'action': 'signup'}).encode())
p.flush()

# Consumer
c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'svc', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False})
c.subscribe(['events'])
msg = c.poll(5.0)
if msg and not msg.error():
    print(json.loads(msg.value()))
    c.commit(message=msg)
c.close()

Node.js (amqplib): RabbitMQ Publish/Subscribe

// Input:  RabbitMQ at amqp://localhost, exchange 'events', queue 'order-processor'
// Output: Publishes event to fanout exchange, consumes from bound queue

const amqp = require('amqplib');  // [email protected]

async function main() {
  const conn = await amqp.connect('amqp://localhost');
  const ch = await conn.createChannel();
  await ch.assertExchange('events', 'fanout', { durable: true });
  ch.publish('events', '', Buffer.from(JSON.stringify({ type: 'order.created', id: '123' })),
    { persistent: true, contentType: 'application/json' });
  const q = await ch.assertQueue('order-processor', { durable: true });
  await ch.bindQueue(q.queue, 'events', '');
  await ch.prefetch(10);
  ch.consume(q.queue, (msg) => {
    if (msg) { console.log(JSON.parse(msg.content.toString())); ch.ack(msg); }
  });
}
main().catch(console.error);

Python (boto3): AWS SQS Send/Receive

# Input:  AWS SQS queue URL, AWS credentials configured
# Output: Sends and receives a message with deduplication

import boto3, json, uuid  # boto3>=1.35.0

sqs = boto3.client('sqs', region_name='us-east-1')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/orders'

# Send (FIFO queue)
sqs.send_message(QueueUrl=queue_url, MessageBody=json.dumps({'order_id': '456', 'action': 'created'}),
    MessageGroupId='orders', MessageDeduplicationId=str(uuid.uuid4()))

# Receive + Delete
resp = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=20)
for msg in resp.get('Messages', []):
    print(json.loads(msg['Body']))
    sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=msg['ReceiptHandle'])

Anti-Patterns

Wrong: Using message broker as a database

# BAD -- scanning entire Kafka topic to find one record -- O(n) per lookup
def get_user_profile(user_id):
    consumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': f'lookup-{user_id}'})
    consumer.subscribe(['user.profiles'])
    for msg in consumer:
        if msg.key() == user_id:
            return json.loads(msg.value())

Correct: Materialize state into a database, use broker for transit

# GOOD -- consume events into a database, query the database for reads
def handle_user_event(event):
    db.execute("INSERT INTO users (id, name, email) VALUES (%s, %s, %s) "
               "ON CONFLICT (id) DO UPDATE SET name=%s, email=%s",
               (event['user_id'], event['name'], event['email'],
                event['name'], event['email']))

def get_user_profile(user_id):
    return db.execute("SELECT * FROM users WHERE id = %s", (user_id,))

Wrong: Publishing commands disguised as events

# BAD -- event name is imperative (a command), creating tight coupling
producer.produce('orders', value=json.dumps({
    'event_type': 'SendEmailToCustomer',  # This is a command, not an event
    'email': '[email protected]',
    'template': 'order_confirmation',
}))

Correct: Publish facts about what happened

# GOOD -- event describes what happened; consumers react independently
producer.produce('orders', value=json.dumps({
    'event_type': 'order.confirmed',      # Past tense = event (a fact)
    'order_id': 'ORD-789',
    'customer_id': 'CUST-123',
    'total': 149.99,
    'confirmed_at': '2026-02-23T10:30:00Z',
}))

Wrong: No schema validation

// BAD -- no contract between producer and consumer
channel.publish('events', '', Buffer.from(JSON.stringify({
  typ: 'order',           // Typo -- no validation catches this
  data: { amt: '19.99' }, // String instead of number -- discovered in production
})));

Correct: Enforce schemas with a registry

// GOOD -- schema registry validates every message
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');
const registry = new SchemaRegistry({ host: 'http://schema-registry:8081' });

const schemaId = await registry.getLatestSchemaId('order-value');
const encoded = await registry.encode(schemaId, {
  event_type: 'order.created',
  event_id: '550e8400-e29b-41d4-a716-446655440000',
  timestamp: '2026-02-23T10:00:00Z',
  version: 1,
  payload: { order_id: 'ORD-789', total: 149.99 },
});
await producer.send({ topic: 'orders', messages: [{ value: encoded }] });

Wrong: Auto-committing offsets before processing

# BAD -- auto-commit advances offset before processing finishes
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'svc',
    'enable.auto.commit': True,     # Dangerous: commits before processing
    'auto.commit.interval.ms': 1000,
})

Correct: Manual commit after successful processing

# GOOD -- commit only after business logic completes successfully
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'svc',
    'enable.auto.commit': False,    # Manual commit only
})
# ... poll, process, then:
consumer.commit(message=msg)        # Commit after processing succeeds

Common Pitfalls

Diagnostic Commands

# Check Kafka consumer group lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group order-processor

# Check Kafka topic partitions and replicas
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders.order.created

# Check RabbitMQ queue depth and consumer count
rabbitmqctl list_queues name messages consumers

# Check RabbitMQ cluster status
rabbitmqctl cluster_status

# Check AWS SQS queue attributes
aws sqs get-queue-attributes --queue-url $QUEUE_URL \
  --attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible

# Monitor NATS JetStream stream info
nats stream info ORDERS

# Check NATS consumer pending count
nats consumer info ORDERS order-processor

Version History & Compatibility

VersionStatusBreaking ChangesMigration Notes
Kafka 3.7+ (2024)CurrentKRaft only — ZooKeeper removedMigrate from ZooKeeper to KRaft before upgrading
Kafka 3.3-3.6StableKRaft GA, ZooKeeper deprecatedStart KRaft migration; both modes supported
Kafka 2.xLegacyUpgrade to 3.x for KRaft, tiered storage
RabbitMQ 4.0+ (2024)CurrentClassic queues deprecatedMigrate all classic queues to quorum queues
RabbitMQ 3.12-3.13LTSQuorum queues default
NATS 2.10+ (2024)CurrentJetStream R2 API changesUpdate client libraries to latest
SQS/SNSManagedN/AAWS manages versioning; FIFO queues added 2016

When to Use / When Not to Use

Use WhenDon't Use WhenUse Instead
Services need temporal decoupling (producer and consumer run at different times/speeds)Request needs synchronous response within 100msREST/gRPC with circuit breaker
One event triggers actions in multiple independent services (fan-out)Only two services communicate and both are always availableDirect service-to-service call (simpler)
Need audit log / event replay / event sourcingData is ephemeral and loss is acceptableIn-memory pub/sub (Redis Pub/Sub, EventEmitter)
Throughput exceeds what synchronous calls can handle (>10K req/s)Low message volume (<100 msg/day) with simple routingCron job + database polling
Need to buffer during downstream outagesAll consumers must process in real-time with no bufferingWebSocket / SSE for real-time push
Regulatory requirement for message durability and orderingPrototyping or MVP where operational overhead must be zeroFirebase, Supabase Realtime, or managed webhooks

Important Caveats

Related Units