Batch Processing Patterns

Type: Software Reference Confidence: 0.90 Sources: 7 Verified: 2026-02-24 Freshness: 2026-02-24

TL;DR

Constraints

Quick Reference

PatternThroughputResumabilityComplexityBest For
SequentialLowEasy (checkpoint per item)MinimalSmall datasets (<10K), strict ordering
Parallel ChunksHighGood (checkpoint per chunk)ModerateMedium datasets (10K-1M), independent items
MapReduceVery HighGood (per-partition)HighHuge datasets (>1M), aggregation/analytics
Micro-BatchingHighModerate (per micro-batch)ModerateNear-real-time with batch semantics
Streaming (Kafka/Flink)Very HighBuilt-in (offsets)HighContinuous unbounded data, sub-second needs
Fan-Out/Fan-InVery HighGood (per worker)HighHeterogeneous tasks, cloud functions
Pipeline (multi-stage)HighPer-stage checkpointsModerateETL with distinct transform steps
Retry + DLQN/A (overlay)ExcellentLow (add-on)Any pattern above -- handles poison messages

Decision Tree

START
|-- Dataset size < 10K items?
|   |-- YES -> Sequential processing with per-item checkpoint
|   +-- NO  v
|-- Items independent (no ordering dependency)?
|   |-- YES v
|   |   |-- Dataset < 1M items?
|   |   |   |-- YES -> Parallel Chunks (worker pool, chunk size 100-1000)
|   |   |   +-- NO  v
|   |   |-- Need aggregation/reduce step?
|   |   |   |-- YES -> MapReduce (partition, map, shuffle, reduce)
|   |   |   +-- NO  -> Fan-Out/Fan-In (distribute to workers, collect results)
|   +-- NO (ordering required) v
|-- Need near-real-time (<5s latency)?
|   |-- YES -> Micro-Batching (buffer 50-500ms, flush as batch)
|   +-- NO  -> Pipeline with stage-level checkpoints
+-- DEFAULT -> Parallel Chunks + Retry/DLQ overlay

Step-by-Step Guide

1. Define chunk boundaries using cursor-based iteration

Never use OFFSET/LIMIT for large datasets -- it rescans rows. Use a cursor column (auto-increment ID or timestamp) to paginate. This ensures O(1) cost per page regardless of dataset size. [src1]

-- Cursor-based chunking: fetch next 1000 items after last processed ID
SELECT * FROM items
WHERE id > :last_processed_id
ORDER BY id ASC
LIMIT 1000;

Verify: Check that each chunk returns exactly LIMIT rows (except the final chunk), and no items are skipped or duplicated.

2. Implement idempotent processing with upserts

Every write operation must be safe to retry. Use UPSERT (INSERT ... ON CONFLICT) so reprocessing the same item overwrites rather than duplicates. [src3]

-- Idempotent upsert: safe to replay
INSERT INTO processed_items (id, result, processed_at)
VALUES (:id, :result, NOW())
ON CONFLICT (id) DO UPDATE SET
  result = EXCLUDED.result,
  processed_at = EXCLUDED.processed_at;

Verify: Run the same item twice -- the row count should not increase, and the result should be identical.

3. Add progress tracking with checkpoints

Store the last successfully processed cursor value in a durable checkpoint table. On restart, read the checkpoint and resume from that point. [src2]

-- Save checkpoint after each successful chunk
INSERT INTO batch_checkpoints (job_id, last_cursor, items_processed, updated_at)
VALUES (:job_id, :last_cursor, :count, NOW())
ON CONFLICT (job_id) DO UPDATE SET
  last_cursor = EXCLUDED.last_cursor,
  items_processed = batch_checkpoints.items_processed + EXCLUDED.items_processed,
  updated_at = EXCLUDED.updated_at;

Verify: Kill the job mid-run, restart it, and confirm it resumes from the checkpoint without reprocessing completed items.

4. Route failures to a dead letter queue

When an item fails after N retries, write it to a dead letter table/queue with the error details. The main batch continues processing remaining items. [src4]

-- Dead letter entry for failed items
INSERT INTO dead_letter_queue (job_id, item_id, payload, error_message, retry_count, failed_at)
VALUES (:job_id, :item_id, :payload, :error, :retries, NOW());

Verify: Intentionally corrupt one item in the batch -- confirm the batch completes and the bad item appears in the DLQ.

5. Add monitoring and alerting

Track batch metrics: items processed, items failed, elapsed time, throughput (items/sec). Alert when failure rate exceeds threshold (e.g., >5% of items fail). [src7]

# Log batch summary
logger.info(f"Batch complete: {processed}/{total} items, "
            f"{failed} failed ({failed/total*100:.1f}%), "
            f"{elapsed:.1f}s, {processed/elapsed:.0f} items/sec")

Verify: Check logs after a batch run to confirm all five metrics are present.

Code Examples

Python: Chunked batch processor with retry and DLQ

# Input:  Database table with items to process
# Output: Processed items written back, failures in DLQ

import time
import psycopg2  # psycopg2==2.9.9

CHUNK_SIZE = 500
MAX_RETRIES = 3

def process_batch(conn, job_id):
    cursor = get_checkpoint(conn, job_id) or 0
    total, failed = 0, 0

    while True:
        items = fetch_chunk(conn, cursor, CHUNK_SIZE)
        if not items:
            break
        for item in items:
            for attempt in range(MAX_RETRIES):
                try:
                    result = process_item(item)  # your logic
                    upsert_result(conn, item["id"], result)
                    break
                except Exception as e:
                    if attempt == MAX_RETRIES - 1:
                        write_to_dlq(conn, job_id, item, str(e))
                        failed += 1
                    else:
                        time.sleep(2 ** attempt)  # exponential backoff
            total += 1
        cursor = items[-1]["id"]
        save_checkpoint(conn, job_id, cursor, len(items))
    return {"processed": total, "failed": failed}

Node.js: Parallel chunk processor with concurrency control

// Input:  Array of item IDs or cursor-based DB query
// Output: Processed results, failures logged to DLQ

// [email protected]
import pLimit from "p-limit";

const CHUNK_SIZE = 500;
const CONCURRENCY = 4;
const MAX_RETRIES = 3;

async function processBatch(db, jobId) {
  const limit = pLimit(CONCURRENCY);
  let cursor = (await getCheckpoint(db, jobId)) || 0;
  let total = 0, failed = 0;

  while (true) {
    const items = await db.query(
      "SELECT * FROM items WHERE id > $1 ORDER BY id LIMIT $2",
      [cursor, CHUNK_SIZE]
    );
    if (items.rows.length === 0) break;

    const tasks = items.rows.map((item) =>
      limit(async () => {
        for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
          try {
            const result = await processItem(item);
            await upsertResult(db, item.id, result);
            return;
          } catch (err) {
            if (attempt === MAX_RETRIES - 1) {
              await writeToDLQ(db, jobId, item, err.message);
              failed++;
            } else {
              await sleep(Math.pow(2, attempt) * 1000);
            }
          }
        }
      })
    );
    await Promise.all(tasks);
    cursor = items.rows[items.rows.length - 1].id;
    await saveCheckpoint(db, jobId, cursor, items.rows.length);
    total += items.rows.length;
  }
  return { processed: total, failed };
}

Go: Fan-out/fan-in with worker pool and progress tracking

// Input:  Database rows fetched via cursor pagination
// Output: Processed results, errors sent to DLQ channel

const (
    chunkSize  = 500
    numWorkers = 8
    maxRetries = 3
)

func processBatch(ctx context.Context, db *sql.DB, jobID string) (int, int) {
    items := make(chan Item, chunkSize)
    dlq := make(chan DLQEntry, 100)
    var wg sync.WaitGroup
    var processed, failed int64

    // Fan-out: start worker pool
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for item := range items {
                if err := processWithRetry(ctx, db, item, maxRetries); err != nil {
                    dlq <- DLQEntry{JobID: jobID, Item: item, Err: err}
                    atomic.AddInt64(&failed, 1)
                }
                atomic.AddInt64(&processed, 1)
            }
        }()
    }

    // Producer: cursor-based chunk reads
    go func() {
        defer close(items)
        cursor := getCheckpoint(db, jobID)
        for {
            chunk := fetchChunk(db, cursor, chunkSize)
            if len(chunk) == 0 { break }
            for _, item := range chunk {
                items <- item
            }
            cursor = chunk[len(chunk)-1].ID
            saveCheckpoint(db, jobID, cursor, len(chunk))
        }
    }()

    wg.Wait()
    close(dlq)
    return int(processed), int(failed)
}

Anti-Patterns

Wrong: Loading entire dataset into memory

# BAD -- loads all rows into memory at once; OOM on large datasets
all_items = db.query("SELECT * FROM items").fetchall()
for item in all_items:
    process(item)

Correct: Cursor-based chunked iteration

# GOOD -- fetches one chunk at a time; constant memory usage
cursor = 0
while True:
    chunk = db.query("SELECT * FROM items WHERE id > %s ORDER BY id LIMIT 500", cursor)
    if not chunk:
        break
    for item in chunk:
        process(item)
    cursor = chunk[-1]["id"]

Wrong: No progress tracking (restart = reprocess everything)

# BAD -- no checkpoint; crash at item 999,999 means starting over
for item in get_all_items():
    process(item)

Correct: Checkpoint after each chunk

# GOOD -- resume from last successful chunk on restart
cursor = load_checkpoint(job_id) or 0
while True:
    chunk = fetch_chunk(cursor, 500)
    if not chunk:
        break
    for item in chunk:
        process(item)
    cursor = chunk[-1]["id"]
    save_checkpoint(job_id, cursor)

Wrong: Non-idempotent processing (duplicates on retry)

# BAD -- INSERT creates duplicates if item is retried
db.execute("INSERT INTO results (item_id, value) VALUES (%s, %s)", (item_id, value))

Correct: Idempotent upsert

# GOOD -- ON CONFLICT prevents duplicates on retry
db.execute("""
    INSERT INTO results (item_id, value, updated_at)
    VALUES (%s, %s, NOW())
    ON CONFLICT (item_id) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
""", (item_id, value))

Wrong: Failed items block the entire batch

# BAD -- one poisoned item stops the whole batch
for item in chunk:
    result = process(item)  # unhandled exception halts everything
    save(result)

Correct: Catch, retry, then dead-letter

# GOOD -- failures go to DLQ; batch continues
for item in chunk:
    try:
        result = process(item)
        save(result)
    except Exception as e:
        if retries_exhausted(item):
            write_to_dlq(item, e)
        else:
            schedule_retry(item)

Common Pitfalls

When to Use / When Not to Use

Use WhenDon't Use WhenUse Instead
Processing a bounded dataset (files, DB tables, API exports)Data arrives continuously and needs <1s processingStream processing (Kafka Streams, Flink)
Items can tolerate minutes/hours of latencyUser is waiting for a synchronous responseRequest/response API pattern
You need atomicity per chunk (all-or-nothing within a chunk)Each event triggers an independent side effectEvent-driven architecture
Cost efficiency matters -- batch infra is idle between runs24/7 low-latency processing is requiredAlways-on streaming consumers
Complex multi-step transformations (ETL/ELT)Simple event routing or pub/sub fan-outMessage broker + consumer groups
Regulatory/compliance requires auditable batch runsReal-time fraud detection or alertingStream processing with CEP

Important Caveats

Related Units