Distributed Job & Task Queue System Design
How do I design a distributed job and task queue system?
TL;DR
- Bottom line: A distributed job queue decouples producers from workers via a broker, enabling asynchronous task processing with retries, dead letter queues, and horizontal scaling — choose Celery for Python, BullMQ for Node.js, Sidekiq for Ruby, or Temporal for long-running workflows.
- Key tool/command:
celery -A tasks worker --loglevel=info(Python) /new Worker(queueName, processor)(BullMQ/Node.js) - Watch out for: Non-idempotent task handlers — at-least-once delivery means your task will run more than once after retries, causing duplicate side effects.
- Works with: Any language. Core patterns are language-agnostic. Code examples in Python (Celery) and Node.js/TypeScript (BullMQ).
Constraints
- All task handlers MUST be idempotent — at-least-once delivery means duplicate execution is expected, not exceptional [src1]
- Never store large payloads (>64 KB) in queue messages — pass a reference (S3 key, DB row ID) and fetch in the worker [src4]
- Always configure a dead letter queue (DLQ) — without one, poison messages block processing or are silently lost [src7]
- Set explicit task TTLs and max retry counts — unbounded retries cause infinite loops and resource exhaustion [src5]
- Broker persistence must match durability needs — an in-memory Redis with no AOF loses all pending jobs on crash [src2]
- Never let workers auto-acknowledge before processing completes — premature ACK means lost tasks on worker crash [src1]
Quick Reference
| Component | Role | Technology Options | Scaling Strategy |
|---|---|---|---|
| Producer | Enqueues tasks with payload and metadata | Any application code (web server, CLI, cron) | Stateless; scale with application tier |
| Message Broker | Stores and routes tasks to workers | Redis (BullMQ, Sidekiq), RabbitMQ (Celery), PostgreSQL (Temporal), SQS (cloud) | Cluster mode with replication; partition by queue |
| Worker Pool | Pulls and executes tasks | Celery workers, BullMQ Workers, Sidekiq threads, Temporal workers | Horizontal scaling; auto-scale on queue depth |
| Result Backend | Stores task outcomes and return values | Redis, PostgreSQL, MongoDB, S3 | Optional; omit if tasks are fire-and-forget |
| Dead Letter Queue | Isolates poison messages after max retries | Dedicated queue in same broker | Monitor DLQ depth; alert on growth |
| Scheduler | Triggers periodic/delayed tasks | Celery Beat, BullMQ repeatable jobs, cron, CloudWatch Events | Single-leader pattern to prevent duplicate scheduling |
| Rate Limiter | Throttles task processing rate | Token bucket per queue or per worker group | Protects downstream APIs from overload |
| Priority Router | Routes high-priority tasks before low-priority | Weighted queues (Sidekiq), priority levels (BullMQ) | Separate worker pools per priority tier |
| Observability | Monitors queue depth, latency, failure rate | Prometheus + Grafana, Datadog, Flower (Celery) | Alert on queue depth > threshold, DLQ growth |
| Retry Engine | Retries failed tasks with backoff | Built-in (all frameworks), custom exponential backoff | Cap retries (3-5); exponential backoff with jitter |
Decision Tree
START: Choose a Task Queue Framework
├── Language: Python?
│ ├── YES → Tasks run < 1 hour?
│ │ ├── YES → Use Celery + Redis/RabbitMQ broker
│ │ └── NO → Use Temporal Python SDK (durable execution)
│ └── NO ↓
├── Language: Node.js / TypeScript?
│ ├── YES → Tasks run < 1 hour?
│ │ ├── YES → Use BullMQ + Redis
│ │ └── NO → Use Temporal TypeScript SDK
│ └── NO ↓
├── Language: Ruby?
│ ├── YES → Use Sidekiq + Redis (default choice for Rails)
│ └── NO ↓
├── Language: Go / Java?
│ ├── YES → Need workflow orchestration?
│ │ ├── YES → Use Temporal (native Go/Java SDKs)
│ │ └── NO → Use cloud-native (SQS + Lambda, Cloud Tasks)
│ └── NO ↓
├── Need long-running workflows (hours/days)?
│ ├── YES → Use Temporal (durable execution, automatic state persistence)
│ └── NO ↓
├── Throughput > 1M tasks/day?
│ ├── YES → Use dedicated broker cluster (RabbitMQ/Redis Cluster) + auto-scaling workers
│ └── NO ↓
└── DEFAULT → Celery (Python) or BullMQ (Node.js) — largest ecosystems, easiest to start
START: Choose Delivery Guarantee
├── Can your system tolerate duplicate processing?
│ ├── YES → At-least-once (default for all frameworks — simplest)
│ └── NO ↓
├── Can you make handlers idempotent (dedup key, upsert, conditional write)?
│ ├── YES → At-least-once + idempotency layer (recommended)
│ └── NO ↓
├── Is losing occasional tasks acceptable?
│ ├── YES → At-most-once (ACK before processing — fast but lossy)
│ └── NO ↓
└── DEFAULT → At-least-once + idempotent handlers (industry standard)
Step-by-Step Guide
1. Define the task interface and serialization
Every task needs a name, a serializable payload, and metadata (retry count, priority, timeout). Keep payloads small and pass references to large data. [src1]
# Python — Celery task definition
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
@app.task(bind=True, max_retries=3, default_retry_delay=60, acks_late=True)
def process_order(self, order_id: int):
"""Process a single order. Idempotent: checks order status before acting."""
order = db.get_order(order_id)
if order.status == 'processed':
return # idempotent guard
try:
charge_payment(order)
update_inventory(order)
order.status = 'processed'
db.save(order)
except PaymentError as exc:
raise self.retry(exc=exc)
Verify: celery -A tasks inspect active → expected: task appears in active list when running
2. Configure the broker with persistence and DLQ
Set up the message broker with appropriate durability. For Redis, enable AOF persistence. Configure dead letter routing for failed messages. [src7]
# Python — Celery configuration with DLQ and retry policy
app.conf.update(
task_acks_late=True, # ACK after processing, not before
worker_prefetch_multiplier=1, # fetch one task at a time (fairness)
task_reject_on_worker_lost=True, # re-queue if worker crashes mid-task
task_default_queue='default',
task_queues={
'default': {'exchange': 'default', 'routing_key': 'default'},
'priority': {'exchange': 'priority', 'routing_key': 'priority'},
'dead_letter': {'exchange': 'dead_letter', 'routing_key': 'dead_letter'},
},
task_annotations={
'tasks.process_order': {'rate_limit': '100/m'},
},
broker_transport_options={
'visibility_timeout': 3600,
},
)
Verify: redis-cli CONFIG GET appendonly → expected: yes
3. Implement worker pool with concurrency control
Start workers with appropriate concurrency. Use prefork for CPU-bound tasks (Celery), event loop for I/O-bound tasks (BullMQ). [src1]
# Start Celery workers with concurrency and queue binding
celery -A tasks worker --loglevel=info --concurrency=4 -Q default,priority
# Start a dedicated DLQ processor (manual inspection)
celery -A tasks worker --loglevel=warning --concurrency=1 -Q dead_letter
Verify: celery -A tasks inspect ping → expected: pong from all workers
4. Add retry logic with exponential backoff
Configure exponential backoff with jitter to prevent thundering herd on transient failures. Cap maximum retries. [src5]
# Python — exponential backoff with jitter
import random
@app.task(bind=True, max_retries=5, acks_late=True)
def call_external_api(self, endpoint: str, payload: dict):
try:
response = requests.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
return response.json()
except requests.RequestException as exc:
backoff = (2 ** self.request.retries) * 30 + random.randint(0, 30)
raise self.retry(exc=exc, countdown=backoff)
Verify: Check retry count in Flower dashboard or celery -A tasks inspect reserved
5. Set up monitoring and alerting
Monitor queue depth, processing latency, failure rate, and DLQ growth. Alert before queues back up. [src6]
# Check queue depth (Redis / BullMQ)
redis-cli LLEN bull:myqueue:wait
# Check Celery queue depth
celery -A tasks inspect active_queues
# Flower web dashboard for Celery
celery -A tasks flower --port=5555
Verify: Navigate to http://localhost:5555 → expected: Flower dashboard with worker and task stats
6. Implement graceful shutdown and scaling
Workers must finish in-progress tasks before shutting down. Use SIGTERM handling and drain mode. [src4]
# Graceful shutdown: send SIGTERM, workers finish current tasks
celery -A tasks control shutdown
# Scale workers dynamically based on queue depth (Kubernetes HPA)
# kubectl autoscale deployment celery-worker --min=2 --max=20 --cpu-percent=70
Verify: Send SIGTERM to worker process → expected: worker finishes current task, then exits cleanly
Code Examples
Python (Celery): Basic Task Producer and Consumer
# Input: order_id (int) — reference to order in database
# Output: task result stored in Redis backend
from celery import Celery
app = Celery('shop', broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1')
@app.task(bind=True, max_retries=3, acks_late=True)
def process_order(self, order_id: int):
"""Idempotent order processor with retry."""
order = db.get(order_id)
if order.status == 'done': return # idempotent guard
try:
charge(order)
order.status = 'done'
db.save(order)
except Exception as exc:
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
# Producer side: enqueue the task
result = process_order.delay(order_id=42)
print(result.get(timeout=120)) # blocks until result ready
Node.js (BullMQ): Queue with Worker and Events
// Input: job data object { orderId: number }
// Output: completed job with return value in Redis
import { Queue, Worker, QueueEvents } from 'bullmq'; // [email protected]
import IORedis from 'ioredis'; // [email protected]
const connection = new IORedis({ host: '127.0.0.1', port: 6379, maxRetriesPerRequest: null });
// Producer: add jobs
const orderQueue = new Queue('orders', { connection });
await orderQueue.add('process', { orderId: 42 }, {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: 1000,
removeOnFail: 5000,
});
// Worker: process jobs
const worker = new Worker('orders', async (job) => {
const order = await db.get(job.data.orderId);
if (order.status === 'done') return; // idempotent guard
await charge(order);
order.status = 'done';
await db.save(order);
return { success: true };
}, { connection, concurrency: 5 });
// Events: monitor completion and failure
const events = new QueueEvents('orders', { connection });
events.on('completed', ({ jobId, returnvalue }) =>
console.log(`Job ${jobId} completed:`, returnvalue));
events.on('failed', ({ jobId, failedReason }) =>
console.error(`Job ${jobId} failed:`, failedReason));
Anti-Patterns
Wrong: Storing entire payload in the queue message
# BAD — large payload serialized into Redis, causes memory pressure and slow serialization
@app.task
def process_image(self, image_bytes: bytes, metadata: dict):
result = resize(image_bytes)
save(result)
Correct: Pass a reference, fetch data in the worker
# GOOD — only a lightweight reference travels through the broker
@app.task
def process_image(self, image_s3_key: str):
image_bytes = s3.download(image_s3_key)
result = resize(image_bytes)
save(result)
Wrong: Acknowledging tasks before processing completes
# BAD — default Celery acks early; if worker crashes mid-task, the task is lost
app.conf.task_acks_late = False # default — ACK on receive, not on completion
Correct: ACK after successful processing
# GOOD — task is re-queued if worker dies before completing
app.conf.task_acks_late = True
app.conf.task_reject_on_worker_lost = True
Wrong: No retry limit — infinite retry loops
# BAD — poison message retries forever, consuming resources indefinitely
@app.task(bind=True)
def fragile_task(self, data):
try:
do_work(data)
except Exception as exc:
raise self.retry(exc=exc) # no max_retries — retries forever
Correct: Bounded retries with exponential backoff and DLQ routing
# GOOD — max 5 retries with exponential backoff, then moves to DLQ
@app.task(bind=True, max_retries=5, acks_late=True)
def safe_task(self, data):
try:
do_work(data)
except Exception as exc:
if self.request.retries >= self.max_retries:
dead_letter_queue.send(data, error=str(exc))
return
raise self.retry(exc=exc, countdown=30 * (2 ** self.request.retries))
Wrong: Non-idempotent task handlers
# BAD — if task retries, user gets charged twice
@app.task
def charge_user(user_id, amount):
payment_service.charge(user_id, amount) # no dedup — runs again on retry
Correct: Idempotent handler with deduplication key
# GOOD — idempotency key prevents duplicate charges
@app.task
def charge_user(user_id, amount, idempotency_key):
if payment_service.has_processed(idempotency_key):
return
payment_service.charge(user_id, amount, idempotency_key=idempotency_key)
Wrong: Single queue for all task types
# BAD — slow 10-minute report generation blocks fast 100ms notification sends
app.conf.task_default_queue = 'default'
# Everything goes to 'default': notifications, reports, emails, imports
Correct: Separate queues by priority and task duration
# GOOD — fast tasks are never blocked by slow ones
app.conf.task_routes = {
'tasks.send_notification': {'queue': 'fast'},
'tasks.generate_report': {'queue': 'slow'},
'tasks.process_payment': {'queue': 'critical'},
}
# Run separate worker pools per queue
# celery -A tasks worker -Q fast --concurrency=10
# celery -A tasks worker -Q slow --concurrency=2
# celery -A tasks worker -Q critical --concurrency=4
Common Pitfalls
- Poison messages blocking the queue: A single malformed task that always fails can block consumers in an infinite retry loop, preventing all other tasks from processing. Fix: set
max_retriesand route to a dead letter queue after exhaustion. [src7] - Thundering herd on retry: When many tasks fail simultaneously (e.g., downstream outage), they all retry at the same time, overwhelming the recovering service. Fix: add random jitter to exponential backoff:
delay = base * 2^retry + random(0, base). [src5] - Worker memory leaks from long-running processes: Celery workers that process millions of tasks accumulate memory. Fix: set
worker_max_tasks_per_child=1000to recycle worker processes after N tasks. [src1] - Visibility timeout too short: If a task takes longer than the broker's visibility timeout, the broker re-delivers it to another worker, causing duplicate execution. Fix: set
visibility_timeouthigher than your longest expected task duration. [src2] - Premature result backend queries: Calling
result.get()immediately aftertask.delay()blocks the caller, defeating the purpose of async processing. Fix: use callbacks, webhooks, or polling withresult.ready(). [src1] - No monitoring on queue depth: Queues silently back up until workers are overwhelmed and tasks time out. Fix: monitor queue depth (e.g.,
LLEN bull:queue:waitfor BullMQ) and alert at thresholds. [src6] - Single scheduler instance without leader election: Running multiple Celery Beat instances causes duplicate periodic task scheduling. Fix: use
django-celery-beatwith database lock or a single Beat instance with health checks. [src1] - Ignoring task serialization format: Using Python pickle for Celery task serialization is a remote code execution vulnerability. Fix: use JSON serialization (
task_serializer='json') and never accept pickle from untrusted sources. [src1]
Diagnostic Commands
# Check Celery worker status
celery -A tasks inspect ping
# List active tasks across all workers
celery -A tasks inspect active
# Check queue depth (Redis-backed broker)
redis-cli LLEN celery
# Check BullMQ queue depth
redis-cli LLEN bull:myqueue:wait
# Monitor Celery in real-time (Flower)
celery -A tasks flower --port=5555
# Check RabbitMQ queue depth
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# Check Sidekiq queue stats via Redis
redis-cli SCARD queues
# View failed jobs in BullMQ
redis-cli LRANGE bull:myqueue:failed 0 10
# Check broker connection from worker
celery -A tasks inspect report
Version History & Compatibility
| Framework | Current Version | Status | Key Changes | Notes |
|---|---|---|---|---|
| Celery 5.6 (Recovery) | 5.6.x | Current stable | Memory leak fixes (Python 3.11+), credential logging fix | Python 3.9-3.13; Redis or RabbitMQ broker |
| BullMQ 5.x | 5.x | Current stable | Job flows with parent-child dependencies, rate limiting, deduplication | Node.js 18+; requires Redis 6.2+ |
| Temporal 1.x | 1.24+ | GA | Durable execution, versioned workflows, schedules API | Go, Java, TypeScript, Python SDKs |
| Sidekiq 7.x | 7.x | Current stable | Capsules (multiple thread configs), embedding, metrics | Ruby 2.7+; Redis 6.2+; Pro/Enterprise for rate limiting |
| Amazon SQS | — | Managed service | Standard (at-least-once) and FIFO (exactly-once) queues | Max message size 256 KB; 14-day retention |
| Google Cloud Tasks | — | Managed service | HTTP target tasks, rate limiting, scheduling | Max 1 MB payload; auto-retry with backoff |
When to Use / When Not to Use
| Use When | Don't Use When | Use Instead |
|---|---|---|
| Background jobs that don't need immediate response (emails, reports, image processing) | Tasks that must complete within the HTTP request-response cycle (<100 ms) | In-process function call or thread pool |
| Workload spikes that exceed server capacity — queue absorbs bursts | Low, steady throughput that a single server handles easily | Direct function invocation |
| Tasks that call unreliable external APIs and need retry with backoff | Purely in-memory computation with no I/O | Multiprocessing / thread pool |
| Long-running workflows spanning minutes to days (Temporal) | Simple cron-like scheduling with no retries needed | OS cron, systemd timers |
| Fan-out: one event triggers many independent tasks in parallel | Strict sequential processing with strong ordering guarantees | Event streaming (Kafka, Pulsar) |
| Decoupled microservices communicating asynchronously | Real-time bidirectional communication (chat, gaming) | WebSockets, gRPC streaming |
Important Caveats
- At-least-once is the default guarantee for all major frameworks — exactly-once requires idempotent handlers plus a deduplication store (Redis SET NX, database upsert)
- Redis-based brokers (BullMQ, Sidekiq, Celery+Redis) lose pending tasks on crash unless Redis persistence (AOF/RDB) is enabled — RabbitMQ offers stronger durability out of the box
- Celery's pickle serializer is a remote code execution vector — always use JSON serialization in production (
task_serializer = 'json') - Temporal is fundamentally different from Celery/BullMQ/Sidekiq: it manages workflow state server-side with durable execution, making it the right choice for long-running, multi-step processes but overkill for simple fire-and-forget tasks
- BullMQ requires
maxRetriesPerRequest: nullin the IORedis connection config for workers — omitting this causes timeout errors under load - Task queue is not the same as event streaming — if you need publish-subscribe, event replay, or log compaction, use Kafka/Pulsar/NATS instead