Bulk Processing Patterns for ERP Integration — Chunking & Parallel Processing Strategies

Type: ERP Integration System: Salesforce, NetSuite, SAP S/4HANA, D365 F&O, Oracle ERP Cloud Confidence: 0.86 Sources: 7 Verified: 2026-03-07 Freshness: 2026-03-07

TL;DR

System Profile

This is a cross-system architecture pattern card covering bulk data processing strategies across the five most widely integrated ERP platforms. It compares native bulk APIs, chunking approaches, concurrency models, and error handling at the bulk operation level.

SystemRoleBulk API SurfaceDirection Covered
SalesforceCRM / PlatformBulk API 2.0 (CSV over REST)Inbound + Outbound
Oracle NetSuiteERP / FinancialsSuiteTalk REST, CSV Import, SuiteQLInbound + Outbound
SAP S/4HANAERP / ManufacturingOData $batch, BAPI, LSMW, LTMCInbound + Outbound
Microsoft D365 F&OERP / FinanceDMF Recurring Integration, OData batchInbound + Outbound
Oracle ERP CloudERP / FinancialsFBDI + ESS, RESTInbound + Outbound

API Surfaces & Capabilities

ERP SystemBulk APIProtocolMax Records/RequestMax File SizeConcurrency LimitAsync?Partial Success?
SalesforceBulk API 2.0REST/CSV150M records/job150 MB/file25 concurrent jobsYesYes (per-record)
NetSuiteCSV ImportUI/Scheduled100K+ per fileNo hard limit5 queues (multi-thread)YesNo (entire batch)
NetSuiteSuiteTalk RESTREST/JSON1 record/request104 MB response15-55 concurrentOptionalN/A
SAP S/4HANAOData $batchREST/JSON1,000 changesets50 MBShared dialog processesYes (async mode)Yes (per-changeset)
SAP S/4HANALSMW/LTMCBatch inputNo hard limitFile-dependent1 session/projectYesPartial (error log)
D365 F&ODMF RecurringREST/PackagePackage-dependentNo documented limitConfigurable (Batch Concurrency Control)YesYes (per-record in log)
Oracle ERP CloudFBDIREST/ZIP(CSV)100,000 per import250 MB/ZIP5 concurrent importsYes (ESS jobs)No (entire import)

Rate Limits & Quotas

Per-Request Limits

ERP SystemLimit TypeValueNotes
SalesforceMax file size per upload150 MBSplit larger datasets into multiple files
SalesforceInternal batch size10,000 recordsAuto-created; each takes up to 10 min
SalesforcePK Chunking default250,000 recordsConfigurable via Sforce-PK-Chunking header
NetSuiteRecords per REST request1Use CSV Import for true bulk ops
NetSuiteMax response payload104 MBPaginate queries exceeding this
SAPChangesets per $batch1,000Split across multiple $batch calls
D365Sync mode timeout30 minutesUse batch mode for longer operations
Oracle FBDIRecords per import100,000Split into multiple FBDI submissions
Oracle FBDIZIP file size250 MBIndividual CSV within ZIP can be up to 1 GB

Rolling / Daily Limits

ERP SystemLimit TypeValueWindowEdition Differences
SalesforceTotal records via Bulk API100,000,00024h rollingSame across editions
SalesforceBulk API batches15,00024h rollingShared across editions
NetSuiteConcurrent threads15-55Per-momentSuiteCloud Plus adds 10/license
SAPOData requestsFair-use throttlingDepends on sizingOn-prem: hardware; Cloud: quota
D365API callsPriority-based throttlingPer-minuteBased on license count
Oracle FBDIConcurrent imports5Per-momentSame across editions

Authentication

ERP SystemRecommended FlowToken LifetimeNotes
SalesforceOAuth 2.0 JWT Bearer2h (session timeout)New JWT per bulk job; never use username-password
NetSuiteToken-Based Auth (TBA)Until revokedCreate dedicated integration user
SAP S/4HANAOAuth 2.0 Client CredentialsConfigurable (12h default)Basic auth deprecated
D365 F&OMicrosoft Entra ID (OAuth 2.0)1h access tokenClient credentials flow
Oracle ERP CloudOAuth 2.0Session-basedVia Oracle IDCS

Authentication Gotchas

Constraints

Integration Pattern Decision Tree

START — User needs to bulk-load data into an ERP system
├── How many records per batch cycle?
│   ├── < 2,000 records
│   │   └── Use standard REST API with composite/batch requests
│   ├── 2,000 – 100,000 records
│   │   ├── Salesforce → Bulk API 2.0 single job
│   │   ├── NetSuite → CSV Import (multi-thread with SuiteCloud Plus)
│   │   ├── SAP → OData $batch (1,000 changesets/request)
│   │   ├── D365 → DMF Recurring Integration (single package)
│   │   └── Oracle → FBDI single ZIP + ESS job
│   ├── 100,000 – 1,000,000 records
│   │   ├── Salesforce → Bulk API 2.0 with file chunking (10-15 MB files)
│   │   ├── NetSuite → CSV Import with queue distribution (queues 1-5)
│   │   ├── SAP → LTMC / BAPI batch processing
│   │   ├── D365 → DMF + Enhanced Parallel Package Import
│   │   └── Oracle → Multiple FBDI submissions (≤100K per import, ≤5 concurrent)
│   └── > 1,000,000 records
│       ├── Salesforce → Bulk API 2.0 with job queuing (max 25 concurrent)
│       ├── NetSuite → Staged CSV Import + message queue orchestration
│       ├── SAP → LSMW / LTMC with parallel sessions
│       ├── D365 → Multiple DMF packages with batch concurrency control
│       └── Oracle → Sequential FBDI batches (respect 5-concurrent limit)
├── Chunking strategy?
│   ├── Fixed-size chunks → Simplest; use when records are independent
│   ├── Adaptive chunks → Adjust based on response times and error rates
│   └── Dependency-aware chunks → Group parent/child records together
├── Error tolerance?
│   ├── Zero-loss → Per-chunk retry + dead letter queue + idempotency keys
│   └── Best-effort → Log failures, skip, continue
└── Performance priority?
    ├── Throughput → Maximize parallelism up to concurrency limit
    └── Reliability → Reduce parallelism, increase retry budget

Quick Reference — Cross-System Bulk Processing Comparison

CapabilitySalesforceNetSuiteSAP S/4HANAD365 F&OOracle ERP Cloud
Primary Bulk APIBulk API 2.0CSV ImportLTMC / OData $batchDMF Recurring IntegrationFBDI + ESS
Max records/job150M/file100K+ (practical)999/BDC; unlimited LSMWPackage-dependent100,000/import
Max file size150 MBNo hard limitFile-dependentNo documented limit250 MB ZIP
Max concurrent jobs255 queuesSession-limitedConfigurable5 imports
Optimal chunk size10K-50K records/file5K-25K records/CSV500-1K changesets10K-50K records/pkg25K-50K records/FBDI
Partial successYes (per-record)No (entire batch)Yes (per-changeset)Yes (per-record in log)No (entire import)
Built-in retryYes (10 retries)NoNoYes (PU64+)No
Typical throughput~3K-5K rec/sec~3 rec/sec (REST); ~500/sec (CSV)~200-1K rec/sec~500-2K rec/sec~300-1K rec/sec
Async processingYesYes (CSV Import)Yes (async OData)Yes (batch mode)Yes (ESS)

Step-by-Step Integration Guide

1. Assess data volume and select chunking strategy

Determine total record count, record dependencies, and target ERP's limits. Choose fixed-size, adaptive, or dependency-aware chunking. [src7]

Verify: chunk_count = ceil(total_records / chunk_size) → expected: reasonable chunk count for target ERP

2. Implement chunk generation with dependency awareness

Split source data into chunks that respect parent-child relationships. [src7]

def chunk_with_dependencies(records, chunk_size=10000):
    """Split records into chunks preserving parent-child groups."""
    groups = {}
    for record in records:
        parent_id = record.get('parent_id')
        if parent_id:
            groups.setdefault(parent_id, []).append(record)
        else:
            groups.setdefault(record['id'], [])
            groups[record['id']].insert(0, record)

    chunks, current_chunk, current_size = [], [], 0
    for group_id, group_records in groups.items():
        group_size = len(group_records)
        if current_size + group_size > chunk_size and current_chunk:
            chunks.append(current_chunk)
            current_chunk, current_size = [], 0
        current_chunk.extend(group_records)
        current_size += group_size
    if current_chunk:
        chunks.append(current_chunk)
    return chunks

3. Implement rate-limit-aware parallel processing

Process chunks in parallel while respecting concurrency limits. [src1, src3]

import asyncio
from asyncio import Semaphore

ERP_CONCURRENCY = {
    'salesforce': 10, 'netsuite': 10, 'sap': 5, 'd365': 8, 'oracle_fbdi': 3,
}

async def process_chunks_parallel(chunks, erp_type, process_fn):
    semaphore = Semaphore(ERP_CONCURRENCY.get(erp_type, 5))
    async def process_with_limit(idx, chunk):
        async with semaphore:
            try:
                result = await process_fn(chunk)
                return {'chunk': idx, 'status': 'success', 'result': result}
            except RateLimitError as e:
                await asyncio.sleep(e.retry_after or 2 ** idx)
                result = await process_fn(chunk)
                return {'chunk': idx, 'status': 'success', 'result': result}
            except Exception as e:
                return {'chunk': idx, 'status': 'failed', 'error': str(e)}
    return await asyncio.gather(*[process_with_limit(i, c) for i, c in enumerate(chunks)])

4. Handle partial failures per ERP

Each ERP handles partial success differently. [src1, src4, src6]

def handle_partial_failures(results, erp_type):
    if erp_type == 'salesforce':
        # Per-record success/failure — retry only failed records
        return [r for r in results if r['sf__Error']]
    elif erp_type == 'netsuite_csv':
        # Entire batch fails — retry all after fixing data
        return results['all_records'] if results.get('status') == 'FAILED' else []
    elif erp_type == 'sap_odata':
        # Per-changeset — retry failed changesets only
        return [cs for cs in results['changesets'] if cs['status'] >= 400]
    elif erp_type == 'd365':
        # Check execution errors via API
        return results.get('execution_errors', [])
    elif erp_type == 'oracle_fbdi':
        # Entire import fails — fix data and resubmit
        return results['all_records'] if results.get('status') == 'ERROR' else []

Code Examples

Python: Salesforce Bulk API 2.0 Job with Chunking

# Input:  Large CSV file (e.g., 500K contact records)
# Output: Bulk job IDs with success/failure counts per chunk

import requests, time, csv, io

class SalesforceBulkChunker:
    MAX_FILE_SIZE = 100 * 1024 * 1024  # 100 MB (leave headroom)
    POLL_INTERVAL = 10  # seconds

    def __init__(self, instance_url, access_token):
        self.base_url = f"{instance_url}/services/data/v62.0"
        self.headers = {'Authorization': f'Bearer {access_token}',
                        'Content-Type': 'text/csv'}

    def chunk_csv_file(self, filepath, target_size_mb=100):
        chunks = []
        with open(filepath, 'r') as f:
            reader = csv.reader(f)
            header = next(reader)
            current_chunk, current_size = [header], 0
            for row in reader:
                row_size = len(','.join(row).encode('utf-8'))
                if current_size + row_size > target_size_mb * 1024 * 1024:
                    chunks.append(current_chunk)
                    current_chunk, current_size = [header], 0
                current_chunk.append(row)
                current_size += row_size
            if len(current_chunk) > 1:
                chunks.append(current_chunk)
        return chunks

    def submit_job(self, sobject, operation, csv_data):
        job_resp = requests.post(f"{self.base_url}/jobs/ingest",
            headers={**self.headers, 'Content-Type': 'application/json'},
            json={'object': sobject, 'operation': operation,
                  'contentType': 'CSV', 'lineEnding': 'LF'})
        job_id = job_resp.json()['id']
        buf = io.StringIO()
        csv.writer(buf).writerows(csv_data)
        requests.put(f"{self.base_url}/jobs/ingest/{job_id}/batches",
            headers=self.headers, data=buf.getvalue().encode('utf-8'))
        requests.patch(f"{self.base_url}/jobs/ingest/{job_id}",
            headers={**self.headers, 'Content-Type': 'application/json'},
            json={'state': 'UploadComplete'})
        return job_id

Python: Adaptive Chunking with Rate-Limit Backoff

# Input:  Records to process, target ERP API
# Output: All records processed with adaptive chunk sizing

class AdaptiveChunker:
    def __init__(self, initial_chunk_size=5000, min_chunk=500, max_chunk=50000):
        self.chunk_size = initial_chunk_size
        self.min_chunk, self.max_chunk = min_chunk, max_chunk
        self.success_count = 0

    def adjust_chunk_size(self, response_time_ms, had_errors):
        if had_errors:
            self.chunk_size = max(self.min_chunk, self.chunk_size // 2)
        elif response_time_ms > 30000:
            self.chunk_size = max(self.min_chunk, int(self.chunk_size * 0.75))
        elif response_time_ms < 5000:
            self.success_count += 1
            if self.success_count >= 3:
                self.chunk_size = min(self.max_chunk, int(self.chunk_size * 1.25))
                self.success_count = 0
        return self.chunk_size

cURL: Salesforce Bulk API 2.0 Job Lifecycle

# Step 1: Create ingest job
curl -X POST https://yourinstance.salesforce.com/services/data/v62.0/jobs/ingest \
  -H "Authorization: Bearer $ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"object":"Contact","operation":"upsert","externalIdFieldName":"External_ID__c","contentType":"CSV"}'

# Step 2: Upload CSV chunk
curl -X PUT https://yourinstance.salesforce.com/services/data/v62.0/jobs/ingest/JOB_ID/batches \
  -H "Authorization: Bearer $ACCESS_TOKEN" \
  -H "Content-Type: text/csv" \
  --data-binary @contacts_chunk_001.csv

# Step 3: Close job to begin processing
curl -X PATCH https://yourinstance.salesforce.com/services/data/v62.0/jobs/ingest/JOB_ID \
  -H "Authorization: Bearer $ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"state":"UploadComplete"}'

# Step 4: Get failed records for retry
curl https://yourinstance.salesforce.com/services/data/v62.0/jobs/ingest/JOB_ID/failedResults \
  -H "Authorization: Bearer $ACCESS_TOKEN" -H "Accept: text/csv"

Data Mapping

Chunking Parameter Reference

ParameterSalesforceNetSuiteSAP S/4HANAD365 F&OOracle FBDI
Optimal chunk size10K-50K records5K-25K records500-1K changesets10K-50K records25K-50K records
Optimal file size10-100 MB< 50 MBN/APackage-dependent< 100 MB per CSV
CompressionNot supportedN/AN/AZIP packagesDEFLATE in ZIP
Character encodingUTF-8UTF-8, no BOMUTF-8UTF-8UTF-8
Date formatISO 8601MM/DD/YYYY or ISOISO 8601ISO 8601YYYY/MM/DD HH:mm:ss

Data Type Gotchas

Error Handling & Failure Points

Common Error Codes

ERPCodeMeaningCauseResolution
SalesforceUNABLE_TO_LOCK_ROWRecord lockedConcurrent updateReduce parallelism; add jitter
SalesforceREQUEST_LIMIT_EXCEEDEDDaily limit hitToo many bulk jobs/24hConsolidate into fewer, larger jobs
NetSuite429Rate limit exceededBurst or daily limitExponential backoff; honor Retry-After
NetSuiteCONCURRENT_REQUEST_LIMITThread limit hitToo many parallel requestsReduce semaphore count
SAP501$batch not supportedEndpoint limitationUse individual requests or BAPI batch
D365ProcessedWithErrorsPartial failureValidation errorsCall GetExecutionErrors API
OracleESS FAILEDImport validation errorData/constraint violationDownload error report; fix and resubmit

Failure Points in Production

Anti-Patterns

Wrong: Processing all records in a single giant batch

# BAD — single-threaded, no chunking, all-or-nothing failure
def bulk_import_bad(records, api_client):
    result = api_client.bulk_upload(records)  # Times out at 150K+
    if result.failed:
        bulk_import_bad(records, api_client)  # Infinite retry loop

Correct: Chunked processing with per-chunk error isolation

# GOOD — chunked, parallel, per-chunk error handling
def bulk_import_good(records, api_client, chunk_size=10000):
    chunks = [records[i:i+chunk_size] for i in range(0, len(records), chunk_size)]
    failed_chunks = []
    for i, chunk in enumerate(chunks):
        try:
            result = api_client.bulk_upload(chunk)
            log_success(i, len(chunk), result.processed)
        except Exception as e:
            failed_chunks.append((i, chunk, str(e)))
    retry_failed_chunks(failed_chunks, api_client)

Wrong: Ignoring partial success results

# BAD — assumes entire job succeeded or failed
job = api_client.get_job_status(job_id)
if job.state == 'JobComplete':
    print("All done!")  # WRONG — can still have failed records

Correct: Checking per-record success/failure

# GOOD — inspect individual record results
job = api_client.get_job_status(job_id)
if job.state == 'JobComplete' and job.number_records_failed > 0:
    failures = api_client.get_failed_results(job_id)
    for record in failures:
        dead_letter_queue.add(record)  # Route to DLQ

Wrong: Maximum parallelism without rate-limit awareness

# BAD — fire all chunks simultaneously
tasks = [api_client.upload(chunk) for chunk in chunks]
await asyncio.gather(*tasks)  # 500 concurrent → instant 429

Correct: Semaphore-controlled parallelism with backoff

# GOOD — respect concurrency limits
sem = asyncio.Semaphore(10)
async def upload_with_limit(chunk):
    async with sem:
        for attempt in range(3):
            try:
                return await api_client.upload(chunk)
            except RateLimitError:
                await asyncio.sleep(2 ** attempt)
await asyncio.gather(*[upload_with_limit(c) for c in chunks])

Common Pitfalls

Diagnostic Commands

# === Salesforce: Check Bulk API job status ===
curl -s "https://$SF_INSTANCE/services/data/v62.0/jobs/ingest/$JOB_ID" \
  -H "Authorization: Bearer $SF_TOKEN" | jq '{state, numberRecordsProcessed, numberRecordsFailed}'

# === Salesforce: Check remaining API limits ===
curl -s "https://$SF_INSTANCE/services/data/v62.0/limits" \
  -H "Authorization: Bearer $SF_TOKEN" | jq '{DailyApiRequests, DailyBulkV2QueryJobs}'

# === D365: Check recurring integration message status ===
curl -X POST "https://$D365_INSTANCE/data/DataManagementDefinitionGroups/Microsoft.Dynamics.DataEntities.GetMessageStatus" \
  -H "Authorization: Bearer $D365_TOKEN" -H "Content-Type: application/json" \
  -d '{"messageId":"MESSAGE_GUID"}'

# === D365: Get execution errors ===
curl -X POST "https://$D365_INSTANCE/data/DataManagementDefinitionGroups/Microsoft.Dynamics.DataEntities.GetExecutionErrors" \
  -H "Authorization: Bearer $D365_TOKEN" -H "Content-Type: application/json" \
  -d '{"executionId":"EXECUTION_ID"}'

# === Oracle FBDI: Check ESS job status ===
curl -s "https://$ORACLE_INSTANCE/fscmRestApi/resources/latest/erpintegrations?finder=ESSJobStatusFinder" \
  -H "Authorization: Bearer $ORACLE_TOKEN"

Version History & Compatibility

ERP SystemBulk API VersionReleaseKey ChangeImpact on Chunking
SalesforceBulk API 2.0v62.0 (2026-02)PK Chunking default raised to 250KLarger query chunks possible
SalesforceBulk API 2.0v56.0 (2022-10)Bulk API 2.0 GASimpler chunking — no manual batch mgmt
NetSuiteSuiteTalk REST2024.2Idempotency key header supportSafer retries on bulk REST operations
SAPOData v42408Async OData $batch supportBulk ops no longer block dialogs
D365DMFPU63 (10.0.38)Batch Concurrency ControlFine-grained parallel task control
D365DMFPU64 (10.0.40)Auto-retry on batch restartsImproved reliability for long imports
OracleFBDI26AFile size limit increased to 500 MBLarger chunks possible

When to Use / When Not to Use

Use WhenDon't Use WhenUse Instead
Data volume > 2,000 records per operationReal-time, sub-second latency requiredREST API with composite requests
Nightly/scheduled data sync between systemsEvent-driven integration (react to changes)CDC / Platform Events / Webhooks
Master data distribution across ERPsOne-time data migration (initial load)ETL tools (LTMC, Data Loader, FBDI staging)
High-volume transaction processingLow-volume, high-frequency updatesStandard REST/OData API
Historical data backfillBidirectional real-time syncDual-write / CDC with conflict resolution

Cross-System Comparison

CapabilitySalesforceNetSuiteSAP S/4HANAD365 F&OOracle ERP CloudNotes
Bulk API maturityExcellentLimitedGoodGoodGoodSF most mature
Auto-chunkingYes (10K)NoNoNoNoSF only
Partial successPer-recordNoPer-changesetPer-record (logs)NoSF best
Concurrency control25 hard capTier-basedResource-basedConfigurable5 hard capD365 most flexible
Built-in retry10 retriesNoneNoneAuto (PU64+)None
Query chunkingPK ChunkingPagination (1K)$skip/$top$skip/$topBICCSF most scalable
Idempotent bulk opsUpsert + ExtIDExternal IDNot nativeAlternate keyNot native
CompressionNoNoNoZIPDEFLATE/ZIP
Async processingNativeNative (CSV)Async ODataBatch modeESS jobsAll support async
Progress APIJob statusTask statusJob logGetMessageStatusESS status

Important Caveats

Related Units