cursor-based chunking + dead letter queue + idempotent upserts| Pattern | Throughput | Resumability | Complexity | Best For |
|---|---|---|---|---|
| Sequential | Low | Easy (checkpoint per item) | Minimal | Small datasets (<10K), strict ordering |
| Parallel Chunks | High | Good (checkpoint per chunk) | Moderate | Medium datasets (10K-1M), independent items |
| MapReduce | Very High | Good (per-partition) | High | Huge datasets (>1M), aggregation/analytics |
| Micro-Batching | High | Moderate (per micro-batch) | Moderate | Near-real-time with batch semantics |
| Streaming (Kafka/Flink) | Very High | Built-in (offsets) | High | Continuous unbounded data, sub-second needs |
| Fan-Out/Fan-In | Very High | Good (per worker) | High | Heterogeneous tasks, cloud functions |
| Pipeline (multi-stage) | High | Per-stage checkpoints | Moderate | ETL with distinct transform steps |
| Retry + DLQ | N/A (overlay) | Excellent | Low (add-on) | Any pattern above -- handles poison messages |
START
|-- Dataset size < 10K items?
| |-- YES -> Sequential processing with per-item checkpoint
| +-- NO v
|-- Items independent (no ordering dependency)?
| |-- YES v
| | |-- Dataset < 1M items?
| | | |-- YES -> Parallel Chunks (worker pool, chunk size 100-1000)
| | | +-- NO v
| | |-- Need aggregation/reduce step?
| | | |-- YES -> MapReduce (partition, map, shuffle, reduce)
| | | +-- NO -> Fan-Out/Fan-In (distribute to workers, collect results)
| +-- NO (ordering required) v
|-- Need near-real-time (<5s latency)?
| |-- YES -> Micro-Batching (buffer 50-500ms, flush as batch)
| +-- NO -> Pipeline with stage-level checkpoints
+-- DEFAULT -> Parallel Chunks + Retry/DLQ overlay
Never use OFFSET/LIMIT for large datasets -- it rescans rows. Use a cursor column (auto-increment ID or timestamp) to paginate. This ensures O(1) cost per page regardless of dataset size. [src1]
-- Cursor-based chunking: fetch next 1000 items after last processed ID
SELECT * FROM items
WHERE id > :last_processed_id
ORDER BY id ASC
LIMIT 1000;
Verify: Check that each chunk returns exactly LIMIT rows (except the final chunk), and no items are skipped or duplicated.
Every write operation must be safe to retry. Use UPSERT (INSERT ... ON CONFLICT) so reprocessing the same item overwrites rather than duplicates. [src3]
-- Idempotent upsert: safe to replay
INSERT INTO processed_items (id, result, processed_at)
VALUES (:id, :result, NOW())
ON CONFLICT (id) DO UPDATE SET
result = EXCLUDED.result,
processed_at = EXCLUDED.processed_at;
Verify: Run the same item twice -- the row count should not increase, and the result should be identical.
Store the last successfully processed cursor value in a durable checkpoint table. On restart, read the checkpoint and resume from that point. [src2]
-- Save checkpoint after each successful chunk
INSERT INTO batch_checkpoints (job_id, last_cursor, items_processed, updated_at)
VALUES (:job_id, :last_cursor, :count, NOW())
ON CONFLICT (job_id) DO UPDATE SET
last_cursor = EXCLUDED.last_cursor,
items_processed = batch_checkpoints.items_processed + EXCLUDED.items_processed,
updated_at = EXCLUDED.updated_at;
Verify: Kill the job mid-run, restart it, and confirm it resumes from the checkpoint without reprocessing completed items.
When an item fails after N retries, write it to a dead letter table/queue with the error details. The main batch continues processing remaining items. [src4]
-- Dead letter entry for failed items
INSERT INTO dead_letter_queue (job_id, item_id, payload, error_message, retry_count, failed_at)
VALUES (:job_id, :item_id, :payload, :error, :retries, NOW());
Verify: Intentionally corrupt one item in the batch -- confirm the batch completes and the bad item appears in the DLQ.
Track batch metrics: items processed, items failed, elapsed time, throughput (items/sec). Alert when failure rate exceeds threshold (e.g., >5% of items fail). [src7]
# Log batch summary
logger.info(f"Batch complete: {processed}/{total} items, "
f"{failed} failed ({failed/total*100:.1f}%), "
f"{elapsed:.1f}s, {processed/elapsed:.0f} items/sec")
Verify: Check logs after a batch run to confirm all five metrics are present.
# Input: Database table with items to process
# Output: Processed items written back, failures in DLQ
import time
import psycopg2 # psycopg2==2.9.9
CHUNK_SIZE = 500
MAX_RETRIES = 3
def process_batch(conn, job_id):
cursor = get_checkpoint(conn, job_id) or 0
total, failed = 0, 0
while True:
items = fetch_chunk(conn, cursor, CHUNK_SIZE)
if not items:
break
for item in items:
for attempt in range(MAX_RETRIES):
try:
result = process_item(item) # your logic
upsert_result(conn, item["id"], result)
break
except Exception as e:
if attempt == MAX_RETRIES - 1:
write_to_dlq(conn, job_id, item, str(e))
failed += 1
else:
time.sleep(2 ** attempt) # exponential backoff
total += 1
cursor = items[-1]["id"]
save_checkpoint(conn, job_id, cursor, len(items))
return {"processed": total, "failed": failed}
// Input: Array of item IDs or cursor-based DB query
// Output: Processed results, failures logged to DLQ
// [email protected]
import pLimit from "p-limit";
const CHUNK_SIZE = 500;
const CONCURRENCY = 4;
const MAX_RETRIES = 3;
async function processBatch(db, jobId) {
const limit = pLimit(CONCURRENCY);
let cursor = (await getCheckpoint(db, jobId)) || 0;
let total = 0, failed = 0;
while (true) {
const items = await db.query(
"SELECT * FROM items WHERE id > $1 ORDER BY id LIMIT $2",
[cursor, CHUNK_SIZE]
);
if (items.rows.length === 0) break;
const tasks = items.rows.map((item) =>
limit(async () => {
for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
try {
const result = await processItem(item);
await upsertResult(db, item.id, result);
return;
} catch (err) {
if (attempt === MAX_RETRIES - 1) {
await writeToDLQ(db, jobId, item, err.message);
failed++;
} else {
await sleep(Math.pow(2, attempt) * 1000);
}
}
}
})
);
await Promise.all(tasks);
cursor = items.rows[items.rows.length - 1].id;
await saveCheckpoint(db, jobId, cursor, items.rows.length);
total += items.rows.length;
}
return { processed: total, failed };
}
// Input: Database rows fetched via cursor pagination
// Output: Processed results, errors sent to DLQ channel
const (
chunkSize = 500
numWorkers = 8
maxRetries = 3
)
func processBatch(ctx context.Context, db *sql.DB, jobID string) (int, int) {
items := make(chan Item, chunkSize)
dlq := make(chan DLQEntry, 100)
var wg sync.WaitGroup
var processed, failed int64
// Fan-out: start worker pool
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range items {
if err := processWithRetry(ctx, db, item, maxRetries); err != nil {
dlq <- DLQEntry{JobID: jobID, Item: item, Err: err}
atomic.AddInt64(&failed, 1)
}
atomic.AddInt64(&processed, 1)
}
}()
}
// Producer: cursor-based chunk reads
go func() {
defer close(items)
cursor := getCheckpoint(db, jobID)
for {
chunk := fetchChunk(db, cursor, chunkSize)
if len(chunk) == 0 { break }
for _, item := range chunk {
items <- item
}
cursor = chunk[len(chunk)-1].ID
saveCheckpoint(db, jobID, cursor, len(chunk))
}
}()
wg.Wait()
close(dlq)
return int(processed), int(failed)
}
# BAD -- loads all rows into memory at once; OOM on large datasets
all_items = db.query("SELECT * FROM items").fetchall()
for item in all_items:
process(item)
# GOOD -- fetches one chunk at a time; constant memory usage
cursor = 0
while True:
chunk = db.query("SELECT * FROM items WHERE id > %s ORDER BY id LIMIT 500", cursor)
if not chunk:
break
for item in chunk:
process(item)
cursor = chunk[-1]["id"]
# BAD -- no checkpoint; crash at item 999,999 means starting over
for item in get_all_items():
process(item)
# GOOD -- resume from last successful chunk on restart
cursor = load_checkpoint(job_id) or 0
while True:
chunk = fetch_chunk(cursor, 500)
if not chunk:
break
for item in chunk:
process(item)
cursor = chunk[-1]["id"]
save_checkpoint(job_id, cursor)
# BAD -- INSERT creates duplicates if item is retried
db.execute("INSERT INTO results (item_id, value) VALUES (%s, %s)", (item_id, value))
# GOOD -- ON CONFLICT prevents duplicates on retry
db.execute("""
INSERT INTO results (item_id, value, updated_at)
VALUES (%s, %s, NOW())
ON CONFLICT (item_id) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
""", (item_id, value))
# BAD -- one poisoned item stops the whole batch
for item in chunk:
result = process(item) # unhandled exception halts everything
save(result)
# GOOD -- failures go to DLQ; batch continues
for item in chunk:
try:
result = process(item)
save(result)
except Exception as e:
if retries_exhausted(item):
write_to_dlq(item, e)
else:
schedule_retry(item)
WHERE id > :last_id ORDER BY id LIMIT :size. [src1]MAX_RETRIES (typically 3-5) and route to DLQ after exhaustion. [src4]CONCURRENCY = num_CPUs for CPU-bound or CONCURRENCY = 10-50 for I/O-bound work. [src1]WHERE created_at > :last_ts skips items created in the same millisecond. Fix: use a monotonic column (auto-increment ID) as cursor, or compound cursor (timestamp, id). [src3]| Use When | Don't Use When | Use Instead |
|---|---|---|
| Processing a bounded dataset (files, DB tables, API exports) | Data arrives continuously and needs <1s processing | Stream processing (Kafka Streams, Flink) |
| Items can tolerate minutes/hours of latency | User is waiting for a synchronous response | Request/response API pattern |
| You need atomicity per chunk (all-or-nothing within a chunk) | Each event triggers an independent side effect | Event-driven architecture |
| Cost efficiency matters -- batch infra is idle between runs | 24/7 low-latency processing is required | Always-on streaming consumers |
| Complex multi-step transformations (ETL/ELT) | Simple event routing or pub/sub fan-out | Message broker + consumer groups |
| Regulatory/compliance requires auditable batch runs | Real-time fraud detection or alerting | Stream processing with CEP |