Batch Processing Patterns
What are the best batch processing patterns?
TL;DR
- Bottom line: Chunk work into resumable units, track progress in durable storage, handle failures idempotently, and route poison messages to a dead letter queue.
- Key tool/command:
cursor-based chunking + dead letter queue + idempotent upserts - Watch out for: Loading the entire dataset into memory -- the #1 cause of OOM kills in batch jobs.
- Works with: Any language/runtime. Patterns are language-agnostic; examples cover Python, Node.js, and Go.
Constraints
- Every batch item must be idempotent -- re-processing the same item must produce the same result without side effects
- Always persist progress checkpoints to durable storage so jobs can resume after failure
- Never load the entire dataset into memory -- use cursor-based iteration or chunked reads
- Dead letter queues are mandatory for production batches -- failed items must not block the rest
- Chunk size must be tuned per workload: too small = overhead, too large = memory pressure and blast radius
Quick Reference
| Pattern | Throughput | Resumability | Complexity | Best For |
|---|---|---|---|---|
| Sequential | Low | Easy (checkpoint per item) | Minimal | Small datasets (<10K), strict ordering |
| Parallel Chunks | High | Good (checkpoint per chunk) | Moderate | Medium datasets (10K-1M), independent items |
| MapReduce | Very High | Good (per-partition) | High | Huge datasets (>1M), aggregation/analytics |
| Micro-Batching | High | Moderate (per micro-batch) | Moderate | Near-real-time with batch semantics |
| Streaming (Kafka/Flink) | Very High | Built-in (offsets) | High | Continuous unbounded data, sub-second needs |
| Fan-Out/Fan-In | Very High | Good (per worker) | High | Heterogeneous tasks, cloud functions |
| Pipeline (multi-stage) | High | Per-stage checkpoints | Moderate | ETL with distinct transform steps |
| Retry + DLQ | N/A (overlay) | Excellent | Low (add-on) | Any pattern above -- handles poison messages |
Decision Tree
START
|-- Dataset size < 10K items?
| |-- YES -> Sequential processing with per-item checkpoint
| +-- NO v
|-- Items independent (no ordering dependency)?
| |-- YES v
| | |-- Dataset < 1M items?
| | | |-- YES -> Parallel Chunks (worker pool, chunk size 100-1000)
| | | +-- NO v
| | |-- Need aggregation/reduce step?
| | | |-- YES -> MapReduce (partition, map, shuffle, reduce)
| | | +-- NO -> Fan-Out/Fan-In (distribute to workers, collect results)
| +-- NO (ordering required) v
|-- Need near-real-time (<5s latency)?
| |-- YES -> Micro-Batching (buffer 50-500ms, flush as batch)
| +-- NO -> Pipeline with stage-level checkpoints
+-- DEFAULT -> Parallel Chunks + Retry/DLQ overlay
Step-by-Step Guide
1. Define chunk boundaries using cursor-based iteration
Never use OFFSET/LIMIT for large datasets -- it rescans rows. Use a cursor column (auto-increment ID or timestamp) to paginate. This ensures O(1) cost per page regardless of dataset size. [src1]
-- Cursor-based chunking: fetch next 1000 items after last processed ID
SELECT * FROM items
WHERE id > :last_processed_id
ORDER BY id ASC
LIMIT 1000;
Verify: Check that each chunk returns exactly LIMIT rows (except the final chunk), and no items are skipped or duplicated.
2. Implement idempotent processing with upserts
Every write operation must be safe to retry. Use UPSERT (INSERT ... ON CONFLICT) so reprocessing the same item overwrites rather than duplicates. [src3]
-- Idempotent upsert: safe to replay
INSERT INTO processed_items (id, result, processed_at)
VALUES (:id, :result, NOW())
ON CONFLICT (id) DO UPDATE SET
result = EXCLUDED.result,
processed_at = EXCLUDED.processed_at;
Verify: Run the same item twice -- the row count should not increase, and the result should be identical.
3. Add progress tracking with checkpoints
Store the last successfully processed cursor value in a durable checkpoint table. On restart, read the checkpoint and resume from that point. [src2]
-- Save checkpoint after each successful chunk
INSERT INTO batch_checkpoints (job_id, last_cursor, items_processed, updated_at)
VALUES (:job_id, :last_cursor, :count, NOW())
ON CONFLICT (job_id) DO UPDATE SET
last_cursor = EXCLUDED.last_cursor,
items_processed = batch_checkpoints.items_processed + EXCLUDED.items_processed,
updated_at = EXCLUDED.updated_at;
Verify: Kill the job mid-run, restart it, and confirm it resumes from the checkpoint without reprocessing completed items.
4. Route failures to a dead letter queue
When an item fails after N retries, write it to a dead letter table/queue with the error details. The main batch continues processing remaining items. [src4]
-- Dead letter entry for failed items
INSERT INTO dead_letter_queue (job_id, item_id, payload, error_message, retry_count, failed_at)
VALUES (:job_id, :item_id, :payload, :error, :retries, NOW());
Verify: Intentionally corrupt one item in the batch -- confirm the batch completes and the bad item appears in the DLQ.
5. Add monitoring and alerting
Track batch metrics: items processed, items failed, elapsed time, throughput (items/sec). Alert when failure rate exceeds threshold (e.g., >5% of items fail). [src7]
# Log batch summary
logger.info(f"Batch complete: {processed}/{total} items, "
f"{failed} failed ({failed/total*100:.1f}%), "
f"{elapsed:.1f}s, {processed/elapsed:.0f} items/sec")
Verify: Check logs after a batch run to confirm all five metrics are present.
Code Examples
Python: Chunked batch processor with retry and DLQ
# Input: Database table with items to process
# Output: Processed items written back, failures in DLQ
import time
import psycopg2 # psycopg2==2.9.9
CHUNK_SIZE = 500
MAX_RETRIES = 3
def process_batch(conn, job_id):
cursor = get_checkpoint(conn, job_id) or 0
total, failed = 0, 0
while True:
items = fetch_chunk(conn, cursor, CHUNK_SIZE)
if not items:
break
for item in items:
for attempt in range(MAX_RETRIES):
try:
result = process_item(item) # your logic
upsert_result(conn, item["id"], result)
break
except Exception as e:
if attempt == MAX_RETRIES - 1:
write_to_dlq(conn, job_id, item, str(e))
failed += 1
else:
time.sleep(2 ** attempt) # exponential backoff
total += 1
cursor = items[-1]["id"]
save_checkpoint(conn, job_id, cursor, len(items))
return {"processed": total, "failed": failed}
Node.js: Parallel chunk processor with concurrency control
// Input: Array of item IDs or cursor-based DB query
// Output: Processed results, failures logged to DLQ
// [email protected]
import pLimit from "p-limit";
const CHUNK_SIZE = 500;
const CONCURRENCY = 4;
const MAX_RETRIES = 3;
async function processBatch(db, jobId) {
const limit = pLimit(CONCURRENCY);
let cursor = (await getCheckpoint(db, jobId)) || 0;
let total = 0, failed = 0;
while (true) {
const items = await db.query(
"SELECT * FROM items WHERE id > $1 ORDER BY id LIMIT $2",
[cursor, CHUNK_SIZE]
);
if (items.rows.length === 0) break;
const tasks = items.rows.map((item) =>
limit(async () => {
for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
try {
const result = await processItem(item);
await upsertResult(db, item.id, result);
return;
} catch (err) {
if (attempt === MAX_RETRIES - 1) {
await writeToDLQ(db, jobId, item, err.message);
failed++;
} else {
await sleep(Math.pow(2, attempt) * 1000);
}
}
}
})
);
await Promise.all(tasks);
cursor = items.rows[items.rows.length - 1].id;
await saveCheckpoint(db, jobId, cursor, items.rows.length);
total += items.rows.length;
}
return { processed: total, failed };
}
Go: Fan-out/fan-in with worker pool and progress tracking
// Input: Database rows fetched via cursor pagination
// Output: Processed results, errors sent to DLQ channel
const (
chunkSize = 500
numWorkers = 8
maxRetries = 3
)
func processBatch(ctx context.Context, db *sql.DB, jobID string) (int, int) {
items := make(chan Item, chunkSize)
dlq := make(chan DLQEntry, 100)
var wg sync.WaitGroup
var processed, failed int64
// Fan-out: start worker pool
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range items {
if err := processWithRetry(ctx, db, item, maxRetries); err != nil {
dlq <- DLQEntry{JobID: jobID, Item: item, Err: err}
atomic.AddInt64(&failed, 1)
}
atomic.AddInt64(&processed, 1)
}
}()
}
// Producer: cursor-based chunk reads
go func() {
defer close(items)
cursor := getCheckpoint(db, jobID)
for {
chunk := fetchChunk(db, cursor, chunkSize)
if len(chunk) == 0 { break }
for _, item := range chunk {
items <- item
}
cursor = chunk[len(chunk)-1].ID
saveCheckpoint(db, jobID, cursor, len(chunk))
}
}()
wg.Wait()
close(dlq)
return int(processed), int(failed)
}
Anti-Patterns
Wrong: Loading entire dataset into memory
# BAD -- loads all rows into memory at once; OOM on large datasets
all_items = db.query("SELECT * FROM items").fetchall()
for item in all_items:
process(item)
Correct: Cursor-based chunked iteration
# GOOD -- fetches one chunk at a time; constant memory usage
cursor = 0
while True:
chunk = db.query("SELECT * FROM items WHERE id > %s ORDER BY id LIMIT 500", cursor)
if not chunk:
break
for item in chunk:
process(item)
cursor = chunk[-1]["id"]
Wrong: No progress tracking (restart = reprocess everything)
# BAD -- no checkpoint; crash at item 999,999 means starting over
for item in get_all_items():
process(item)
Correct: Checkpoint after each chunk
# GOOD -- resume from last successful chunk on restart
cursor = load_checkpoint(job_id) or 0
while True:
chunk = fetch_chunk(cursor, 500)
if not chunk:
break
for item in chunk:
process(item)
cursor = chunk[-1]["id"]
save_checkpoint(job_id, cursor)
Wrong: Non-idempotent processing (duplicates on retry)
# BAD -- INSERT creates duplicates if item is retried
db.execute("INSERT INTO results (item_id, value) VALUES (%s, %s)", (item_id, value))
Correct: Idempotent upsert
# GOOD -- ON CONFLICT prevents duplicates on retry
db.execute("""
INSERT INTO results (item_id, value, updated_at)
VALUES (%s, %s, NOW())
ON CONFLICT (item_id) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
""", (item_id, value))
Wrong: Failed items block the entire batch
# BAD -- one poisoned item stops the whole batch
for item in chunk:
result = process(item) # unhandled exception halts everything
save(result)
Correct: Catch, retry, then dead-letter
# GOOD -- failures go to DLQ; batch continues
for item in chunk:
try:
result = process(item)
save(result)
except Exception as e:
if retries_exhausted(item):
write_to_dlq(item, e)
else:
schedule_retry(item)
Common Pitfalls
- OFFSET/LIMIT pagination on large tables: Performance degrades linearly -- OFFSET 1000000 scans 1M rows. Fix: use cursor-based pagination with
WHERE id > :last_id ORDER BY id LIMIT :size. [src1] - Unbounded retry loops: Retrying a poisoned message forever wastes resources and blocks progress. Fix: set
MAX_RETRIES(typically 3-5) and route to DLQ after exhaustion. [src4] - Single-threaded bottleneck on CPU-bound work: Processing items sequentially when they are independent wastes available cores. Fix: use worker pool with
CONCURRENCY = num_CPUsfor CPU-bound orCONCURRENCY = 10-50for I/O-bound work. [src1] - Chunk size too large for available memory: A chunk of 100K rows with 10KB each = 1GB in memory per chunk. Fix: profile memory, start with 100-500 items per chunk, and tune up. [src2]
- No backpressure between producer and consumer: Producer fills an unbounded channel/queue faster than consumers can drain, causing OOM. Fix: use bounded channels/queues. [src6]
- Ignoring transaction boundaries: Processing and checkpointing in separate transactions means a crash between them causes reprocessing. Fix: wrap chunk processing + checkpoint update in a single transaction. [src2]
- Clock-based cursors with duplicate timestamps: Using
WHERE created_at > :last_tsskips items created in the same millisecond. Fix: use a monotonic column (auto-increment ID) as cursor, or compound cursor(timestamp, id). [src3]
When to Use / When Not to Use
| Use When | Don't Use When | Use Instead |
|---|---|---|
| Processing a bounded dataset (files, DB tables, API exports) | Data arrives continuously and needs <1s processing | Stream processing (Kafka Streams, Flink) |
| Items can tolerate minutes/hours of latency | User is waiting for a synchronous response | Request/response API pattern |
| You need atomicity per chunk (all-or-nothing within a chunk) | Each event triggers an independent side effect | Event-driven architecture |
| Cost efficiency matters -- batch infra is idle between runs | 24/7 low-latency processing is required | Always-on streaming consumers |
| Complex multi-step transformations (ETL/ELT) | Simple event routing or pub/sub fan-out | Message broker + consumer groups |
| Regulatory/compliance requires auditable batch runs | Real-time fraud detection or alerting | Stream processing with CEP |
Important Caveats
- Batch and stream are not mutually exclusive. Many production systems use Lambda architecture (batch layer for completeness + stream layer for speed). Choose based on latency requirements, not ideology.
- Cloud-native batch (AWS Batch, GCP Cloud Run Jobs, Azure Container Apps Jobs) auto-scales workers but charges per vCPU-second. Profile your chunk processing time to avoid cost surprises on large datasets.
- Spring Batch is the most mature framework for JVM batch processing with built-in chunk-oriented processing, skip/retry policies, and job restartability. If you are on the JVM, use it rather than building from scratch.
- Idempotency requires a natural or synthetic unique key per item. If your data lacks one, generate a deterministic hash from the item's content before processing.
- DLQ reprocessing should be a deliberate manual operation, not automatic -- automatic DLQ replay without fixing the root cause creates infinite loops.