celery -A tasks worker --loglevel=info (Python) / new Worker(queueName, processor) (BullMQ/Node.js)| Component | Role | Technology Options | Scaling Strategy |
|---|---|---|---|
| Producer | Enqueues tasks with payload and metadata | Any application code (web server, CLI, cron) | Stateless; scale with application tier |
| Message Broker | Stores and routes tasks to workers | Redis (BullMQ, Sidekiq), RabbitMQ (Celery), PostgreSQL (Temporal), SQS (cloud) | Cluster mode with replication; partition by queue |
| Worker Pool | Pulls and executes tasks | Celery workers, BullMQ Workers, Sidekiq threads, Temporal workers | Horizontal scaling; auto-scale on queue depth |
| Result Backend | Stores task outcomes and return values | Redis, PostgreSQL, MongoDB, S3 | Optional; omit if tasks are fire-and-forget |
| Dead Letter Queue | Isolates poison messages after max retries | Dedicated queue in same broker | Monitor DLQ depth; alert on growth |
| Scheduler | Triggers periodic/delayed tasks | Celery Beat, BullMQ repeatable jobs, cron, CloudWatch Events | Single-leader pattern to prevent duplicate scheduling |
| Rate Limiter | Throttles task processing rate | Token bucket per queue or per worker group | Protects downstream APIs from overload |
| Priority Router | Routes high-priority tasks before low-priority | Weighted queues (Sidekiq), priority levels (BullMQ) | Separate worker pools per priority tier |
| Observability | Monitors queue depth, latency, failure rate | Prometheus + Grafana, Datadog, Flower (Celery) | Alert on queue depth > threshold, DLQ growth |
| Retry Engine | Retries failed tasks with backoff | Built-in (all frameworks), custom exponential backoff | Cap retries (3-5); exponential backoff with jitter |
START: Choose a Task Queue Framework
├── Language: Python?
│ ├── YES → Tasks run < 1 hour?
│ │ ├── YES → Use Celery + Redis/RabbitMQ broker
│ │ └── NO → Use Temporal Python SDK (durable execution)
│ └── NO ↓
├── Language: Node.js / TypeScript?
│ ├── YES → Tasks run < 1 hour?
│ │ ├── YES → Use BullMQ + Redis
│ │ └── NO → Use Temporal TypeScript SDK
│ └── NO ↓
├── Language: Ruby?
│ ├── YES → Use Sidekiq + Redis (default choice for Rails)
│ └── NO ↓
├── Language: Go / Java?
│ ├── YES → Need workflow orchestration?
│ │ ├── YES → Use Temporal (native Go/Java SDKs)
│ │ └── NO → Use cloud-native (SQS + Lambda, Cloud Tasks)
│ └── NO ↓
├── Need long-running workflows (hours/days)?
│ ├── YES → Use Temporal (durable execution, automatic state persistence)
│ └── NO ↓
├── Throughput > 1M tasks/day?
│ ├── YES → Use dedicated broker cluster (RabbitMQ/Redis Cluster) + auto-scaling workers
│ └── NO ↓
└── DEFAULT → Celery (Python) or BullMQ (Node.js) — largest ecosystems, easiest to start
START: Choose Delivery Guarantee
├── Can your system tolerate duplicate processing?
│ ├── YES → At-least-once (default for all frameworks — simplest)
│ └── NO ↓
├── Can you make handlers idempotent (dedup key, upsert, conditional write)?
│ ├── YES → At-least-once + idempotency layer (recommended)
│ └── NO ↓
├── Is losing occasional tasks acceptable?
│ ├── YES → At-most-once (ACK before processing — fast but lossy)
│ └── NO ↓
└── DEFAULT → At-least-once + idempotent handlers (industry standard)
Every task needs a name, a serializable payload, and metadata (retry count, priority, timeout). Keep payloads small and pass references to large data. [src1]
# Python — Celery task definition
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
@app.task(bind=True, max_retries=3, default_retry_delay=60, acks_late=True)
def process_order(self, order_id: int):
"""Process a single order. Idempotent: checks order status before acting."""
order = db.get_order(order_id)
if order.status == 'processed':
return # idempotent guard
try:
charge_payment(order)
update_inventory(order)
order.status = 'processed'
db.save(order)
except PaymentError as exc:
raise self.retry(exc=exc)
Verify: celery -A tasks inspect active → expected: task appears in active list when running
Set up the message broker with appropriate durability. For Redis, enable AOF persistence. Configure dead letter routing for failed messages. [src7]
# Python — Celery configuration with DLQ and retry policy
app.conf.update(
task_acks_late=True, # ACK after processing, not before
worker_prefetch_multiplier=1, # fetch one task at a time (fairness)
task_reject_on_worker_lost=True, # re-queue if worker crashes mid-task
task_default_queue='default',
task_queues={
'default': {'exchange': 'default', 'routing_key': 'default'},
'priority': {'exchange': 'priority', 'routing_key': 'priority'},
'dead_letter': {'exchange': 'dead_letter', 'routing_key': 'dead_letter'},
},
task_annotations={
'tasks.process_order': {'rate_limit': '100/m'},
},
broker_transport_options={
'visibility_timeout': 3600,
},
)
Verify: redis-cli CONFIG GET appendonly → expected: yes
Start workers with appropriate concurrency. Use prefork for CPU-bound tasks (Celery), event loop for I/O-bound tasks (BullMQ). [src1]
# Start Celery workers with concurrency and queue binding
celery -A tasks worker --loglevel=info --concurrency=4 -Q default,priority
# Start a dedicated DLQ processor (manual inspection)
celery -A tasks worker --loglevel=warning --concurrency=1 -Q dead_letter
Verify: celery -A tasks inspect ping → expected: pong from all workers
Configure exponential backoff with jitter to prevent thundering herd on transient failures. Cap maximum retries. [src5]
# Python — exponential backoff with jitter
import random
@app.task(bind=True, max_retries=5, acks_late=True)
def call_external_api(self, endpoint: str, payload: dict):
try:
response = requests.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
return response.json()
except requests.RequestException as exc:
backoff = (2 ** self.request.retries) * 30 + random.randint(0, 30)
raise self.retry(exc=exc, countdown=backoff)
Verify: Check retry count in Flower dashboard or celery -A tasks inspect reserved
Monitor queue depth, processing latency, failure rate, and DLQ growth. Alert before queues back up. [src6]
# Check queue depth (Redis / BullMQ)
redis-cli LLEN bull:myqueue:wait
# Check Celery queue depth
celery -A tasks inspect active_queues
# Flower web dashboard for Celery
celery -A tasks flower --port=5555
Verify: Navigate to http://localhost:5555 → expected: Flower dashboard with worker and task stats
Workers must finish in-progress tasks before shutting down. Use SIGTERM handling and drain mode. [src4]
# Graceful shutdown: send SIGTERM, workers finish current tasks
celery -A tasks control shutdown
# Scale workers dynamically based on queue depth (Kubernetes HPA)
# kubectl autoscale deployment celery-worker --min=2 --max=20 --cpu-percent=70
Verify: Send SIGTERM to worker process → expected: worker finishes current task, then exits cleanly
# Input: order_id (int) — reference to order in database
# Output: task result stored in Redis backend
from celery import Celery
app = Celery('shop', broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1')
@app.task(bind=True, max_retries=3, acks_late=True)
def process_order(self, order_id: int):
"""Idempotent order processor with retry."""
order = db.get(order_id)
if order.status == 'done': return # idempotent guard
try:
charge(order)
order.status = 'done'
db.save(order)
except Exception as exc:
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
# Producer side: enqueue the task
result = process_order.delay(order_id=42)
print(result.get(timeout=120)) # blocks until result ready
// Input: job data object { orderId: number }
// Output: completed job with return value in Redis
import { Queue, Worker, QueueEvents } from 'bullmq'; // [email protected]
import IORedis from 'ioredis'; // [email protected]
const connection = new IORedis({ host: '127.0.0.1', port: 6379, maxRetriesPerRequest: null });
// Producer: add jobs
const orderQueue = new Queue('orders', { connection });
await orderQueue.add('process', { orderId: 42 }, {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: 1000,
removeOnFail: 5000,
});
// Worker: process jobs
const worker = new Worker('orders', async (job) => {
const order = await db.get(job.data.orderId);
if (order.status === 'done') return; // idempotent guard
await charge(order);
order.status = 'done';
await db.save(order);
return { success: true };
}, { connection, concurrency: 5 });
// Events: monitor completion and failure
const events = new QueueEvents('orders', { connection });
events.on('completed', ({ jobId, returnvalue }) =>
console.log(`Job ${jobId} completed:`, returnvalue));
events.on('failed', ({ jobId, failedReason }) =>
console.error(`Job ${jobId} failed:`, failedReason));
# BAD — large payload serialized into Redis, causes memory pressure and slow serialization
@app.task
def process_image(self, image_bytes: bytes, metadata: dict):
result = resize(image_bytes)
save(result)
# GOOD — only a lightweight reference travels through the broker
@app.task
def process_image(self, image_s3_key: str):
image_bytes = s3.download(image_s3_key)
result = resize(image_bytes)
save(result)
# BAD — default Celery acks early; if worker crashes mid-task, the task is lost
app.conf.task_acks_late = False # default — ACK on receive, not on completion
# GOOD — task is re-queued if worker dies before completing
app.conf.task_acks_late = True
app.conf.task_reject_on_worker_lost = True
# BAD — poison message retries forever, consuming resources indefinitely
@app.task(bind=True)
def fragile_task(self, data):
try:
do_work(data)
except Exception as exc:
raise self.retry(exc=exc) # no max_retries — retries forever
# GOOD — max 5 retries with exponential backoff, then moves to DLQ
@app.task(bind=True, max_retries=5, acks_late=True)
def safe_task(self, data):
try:
do_work(data)
except Exception as exc:
if self.request.retries >= self.max_retries:
dead_letter_queue.send(data, error=str(exc))
return
raise self.retry(exc=exc, countdown=30 * (2 ** self.request.retries))
# BAD — if task retries, user gets charged twice
@app.task
def charge_user(user_id, amount):
payment_service.charge(user_id, amount) # no dedup — runs again on retry
# GOOD — idempotency key prevents duplicate charges
@app.task
def charge_user(user_id, amount, idempotency_key):
if payment_service.has_processed(idempotency_key):
return
payment_service.charge(user_id, amount, idempotency_key=idempotency_key)
# BAD — slow 10-minute report generation blocks fast 100ms notification sends
app.conf.task_default_queue = 'default'
# Everything goes to 'default': notifications, reports, emails, imports
# GOOD — fast tasks are never blocked by slow ones
app.conf.task_routes = {
'tasks.send_notification': {'queue': 'fast'},
'tasks.generate_report': {'queue': 'slow'},
'tasks.process_payment': {'queue': 'critical'},
}
# Run separate worker pools per queue
# celery -A tasks worker -Q fast --concurrency=10
# celery -A tasks worker -Q slow --concurrency=2
# celery -A tasks worker -Q critical --concurrency=4
max_retries and route to a dead letter queue after exhaustion. [src7]delay = base * 2^retry + random(0, base). [src5]worker_max_tasks_per_child=1000 to recycle worker processes after N tasks. [src1]visibility_timeout higher than your longest expected task duration. [src2]result.get() immediately after task.delay() blocks the caller, defeating the purpose of async processing. Fix: use callbacks, webhooks, or polling with result.ready(). [src1]LLEN bull:queue:wait for BullMQ) and alert at thresholds. [src6]django-celery-beat with database lock or a single Beat instance with health checks. [src1]task_serializer='json') and never accept pickle from untrusted sources. [src1]# Check Celery worker status
celery -A tasks inspect ping
# List active tasks across all workers
celery -A tasks inspect active
# Check queue depth (Redis-backed broker)
redis-cli LLEN celery
# Check BullMQ queue depth
redis-cli LLEN bull:myqueue:wait
# Monitor Celery in real-time (Flower)
celery -A tasks flower --port=5555
# Check RabbitMQ queue depth
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# Check Sidekiq queue stats via Redis
redis-cli SCARD queues
# View failed jobs in BullMQ
redis-cli LRANGE bull:myqueue:failed 0 10
# Check broker connection from worker
celery -A tasks inspect report
| Framework | Current Version | Status | Key Changes | Notes |
|---|---|---|---|---|
| Celery 5.6 (Recovery) | 5.6.x | Current stable | Memory leak fixes (Python 3.11+), credential logging fix | Python 3.9-3.13; Redis or RabbitMQ broker |
| BullMQ 5.x | 5.x | Current stable | Job flows with parent-child dependencies, rate limiting, deduplication | Node.js 18+; requires Redis 6.2+ |
| Temporal 1.x | 1.24+ | GA | Durable execution, versioned workflows, schedules API | Go, Java, TypeScript, Python SDKs |
| Sidekiq 7.x | 7.x | Current stable | Capsules (multiple thread configs), embedding, metrics | Ruby 2.7+; Redis 6.2+; Pro/Enterprise for rate limiting |
| Amazon SQS | — | Managed service | Standard (at-least-once) and FIFO (exactly-once) queues | Max message size 256 KB; 14-day retention |
| Google Cloud Tasks | — | Managed service | HTTP target tasks, rate limiting, scheduling | Max 1 MB payload; auto-retry with backoff |
| Use When | Don't Use When | Use Instead |
|---|---|---|
| Background jobs that don't need immediate response (emails, reports, image processing) | Tasks that must complete within the HTTP request-response cycle (<100 ms) | In-process function call or thread pool |
| Workload spikes that exceed server capacity — queue absorbs bursts | Low, steady throughput that a single server handles easily | Direct function invocation |
| Tasks that call unreliable external APIs and need retry with backoff | Purely in-memory computation with no I/O | Multiprocessing / thread pool |
| Long-running workflows spanning minutes to days (Temporal) | Simple cron-like scheduling with no retries needed | OS cron, systemd timers |
| Fan-out: one event triggers many independent tasks in parallel | Strict sequential processing with strong ordering guarantees | Event streaming (Kafka, Pulsar) |
| Decoupled microservices communicating asynchronously | Real-time bidirectional communication (chat, gaming) | WebSockets, gRPC streaming |
task_serializer = 'json')maxRetriesPerRequest: null in the IORedis connection config for workers — omitting this causes timeout errors under load