mongoexport, design a relational or hybrid (JSONB) schema in PostgreSQL, transform nested documents into rows or JSONB columns, and load with \copy or a migration script — use AWS DMS or CDC pipelines for zero-downtime migrations.mongoexport --db mydb --collection users --type=json --out users.jsonmongo_id) during migration — you will need it for cross-referencing, debugging, and rollback. [src6]COPY protocol (not individual INSERTs) for bulk loading — COPY is 5–10x faster and avoids transaction overhead on large datasets. [src7]ALTER TABLE ... DISABLE TRIGGER ALL;) and re-enable + validate after — FK violations during load are the #1 cause of failed migrations. [src3]Decimal128 must map to PostgreSQL NUMERIC, not FLOAT — using FLOAT silently loses precision on financial data. [src1]_doc JSONB column — this requires a second normalization pass and is not a complete migration on its own. [src3]| MongoDB Concept | PostgreSQL Equivalent | Example |
|---|---|---|
| Collection | Table | CREATE TABLE users (...) |
| Document (BSON) | Row (or JSONB column) | INSERT INTO users (data) VALUES ('{"name":"Alice"}'::jsonb) |
_id (ObjectId) | UUID or BIGSERIAL primary key | id UUID DEFAULT gen_random_uuid() |
| Embedded subdocument | JSONB column or separate table (FK) | metadata JSONB or CREATE TABLE addresses (user_id UUID REFERENCES users) |
| Array field | JSONB array or junction table | tags JSONB or CREATE TABLE user_tags (user_id, tag_id) |
$lookup (aggregation) | JOIN | SELECT * FROM orders JOIN users ON orders.user_id = users.id |
find({status: "active"}) | WHERE clause | SELECT * FROM users WHERE status = 'active' |
find({"addr.city": "NYC"}) | JSONB operator ->> | SELECT * FROM users WHERE data->>'city' = 'NYC' |
| Compound index | B-tree composite index | CREATE INDEX idx ON users (status, created_at) |
| Text index | tsvector + GIN index | CREATE INDEX idx ON docs USING GIN (to_tsvector('english', body)) |
$regex query | ~ or LIKE/ILIKE | SELECT * FROM users WHERE name ~ '^Al' |
| Schema-less fields | JSONB with GIN index | CREATE INDEX idx ON users USING GIN (metadata) |
db.collection.aggregate() | SQL GROUP BY / window functions | SELECT status, COUNT(*) FROM orders GROUP BY status |
| Replica set (read scaling) | Read replicas / streaming replication | pg_basebackup + streaming replication |
Decimal128 | NUMERIC | price NUMERIC(12,2) |
ISODate() | TIMESTAMPTZ | created_at TIMESTAMPTZ DEFAULT NOW() |
JSON_TABLE() (PG 17+) | Explode JSON arrays to rows | SELECT * FROM JSON_TABLE(doc, '$.items[*]' COLUMNS ...) |
| UUID v7 (PG 18+) | Time-ordered UUIDs | id UUID DEFAULT uuidv7() |
START
├── Is downtime acceptable during migration?
│ ├── YES → Use mongoexport + ETL script + COPY (simplest approach)
│ └── NO ↓
├── Need real-time sync during cutover?
│ ├── YES → Use AWS DMS with CDC or Debezium + Kafka pipeline
│ └── NO ↓
├── Data volume > 100 GB?
│ ├── YES → Use parallel export (mongodump --numParallelCollections) + batch COPY
│ └── NO ↓
├── Documents have highly variable schemas?
│ ├── YES → Hybrid approach: relational columns for common fields + JSONB for variable
│ └── NO ↓
├── Deep nesting (>3 levels)?
│ ├── YES → Store as JSONB, add GIN indexes for query paths
│ └── NO ↓
└── DEFAULT → Fully normalize into relational tables with foreign keys
Analyze every collection's document structure, field types, nesting depth, and access patterns. This determines which fields to normalize into columns vs. store as JSONB. [src4, src5]
# List all collections and their document counts
mongosh --eval 'db.getCollectionNames().forEach(c => print(c, db[c].countDocuments()))'
# Sample a collection's schema
mongosh --eval 'db.users.aggregate([{$sample: {size: 100}}]).forEach(d => printjson(Object.keys(d)))'
# Detect field type variance (critical for schema design)
mongosh --eval '
db.users.aggregate([
{ $sample: { size: 5000 } },
{ $project: { _id: 0 } },
{ $addFields: { _keys: { $objectToArray: "$$ROOT" } } },
{ $unwind: "$_keys" },
{ $group: { _id: { field: "$_keys.k", type: { $type: "$_keys.v" } }, count: { $sum: 1 } } },
{ $sort: { "_id.field": 1 } }
])'
Verify: You have a list of all collections with field names, types, and nesting depth. Create an ER diagram for the target PostgreSQL schema before writing any migration code.
Design tables based on your audit. Use a hybrid approach: relational columns for frequently queried fields, JSONB for variable or deeply nested data. [src1, src2]
-- PostgreSQL 18+: uuidv7() available for time-ordered UUIDs
-- PostgreSQL 13+: gen_random_uuid() is built-in
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
mongo_id VARCHAR(24),
email TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
status TEXT DEFAULT 'active',
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ,
metadata JSONB DEFAULT '{}'
);
CREATE TABLE addresses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
street TEXT, city TEXT, state TEXT, zip TEXT,
country TEXT DEFAULT 'US',
is_primary BOOLEAN DEFAULT false
);
CREATE INDEX idx_users_email ON users (email);
CREATE INDEX idx_users_status ON users (status);
CREATE INDEX idx_users_metadata ON users USING GIN (metadata);
CREATE INDEX idx_users_created ON users (created_at DESC);
CREATE INDEX idx_addresses_user ON addresses (user_id);
Verify: \dt shows all tables. \d users shows correct column types and constraints.
Use mongoexport for small-to-medium datasets or mongodump for large binary exports. JSON format preserves nested structure better than CSV. [src4]
# Export as JSON (one document per line)
mongoexport --db=myapp --collection=users --type=json --out=users.json
# For large collections, export in parallel
mongodump --db=myapp --numParallelCollections=4 --out=./dump/
# Verify export
wc -l users.json
Verify: Line count in exported JSON matches document count in MongoDB.
Write a script that reads MongoDB JSON, transforms documents to match the PostgreSQL schema, and inserts using batch operations. [src4, src5]
import json, psycopg2
conn = psycopg2.connect(host="localhost", dbname="myapp", user="postgres")
cur = conn.cursor()
BATCH_SIZE = 1000
batch = []
with open("users.json", "r") as f:
for i, line in enumerate(f, 1):
doc = json.loads(line)
mongo_id = doc["_id"]["$oid"] if isinstance(doc["_id"], dict) else str(doc["_id"])
created = doc.get("createdAt", {}).get("$date") if isinstance(doc.get("createdAt"), dict) else doc.get("createdAt")
metadata = {k: v for k, v in doc.items() if k not in ("_id","email","name","status","createdAt")}
batch.append((mongo_id, doc.get("email",""), doc.get("name",""),
doc.get("status","active"), created, json.dumps(metadata)))
if i % BATCH_SIZE == 0:
cur.executemany("""
INSERT INTO users (mongo_id, email, name, status, created_at, metadata)
VALUES (%s, %s, %s, %s, %s, %s::jsonb)
ON CONFLICT (email) DO UPDATE SET name=EXCLUDED.name, metadata=EXCLUDED.metadata
""", batch)
batch.clear()
conn.commit()
if batch:
cur.executemany("""INSERT INTO users (mongo_id,email,name,status,created_at,metadata) VALUES (%s,%s,%s,%s,%s,%s::jsonb) ON CONFLICT (email) DO NOTHING""", batch)
conn.commit()
cur.close(); conn.close()
Verify: SELECT COUNT(*) FROM users; matches the MongoDB document count.
Recreate MongoDB indexes as PostgreSQL equivalents. Test every critical query path against the new schema. [src1, src2]
-- Full-text search (replaces MongoDB text index)
ALTER TABLE products ADD COLUMN search_vector tsvector
GENERATED ALWAYS AS (to_tsvector('english', coalesce(name,'') || ' ' || coalesce(description,''))) STORED;
CREATE INDEX idx_products_search ON products USING GIN (search_vector);
-- GIN index on JSONB (replaces MongoDB flexible queries)
CREATE INDEX idx_events_metadata ON events USING GIN (metadata);
-- PostgreSQL 17+: Use JSON_TABLE for analytics on JSONB arrays
SELECT jt.sku, SUM(jt.qty) AS total
FROM orders, JSON_TABLE(items, '$[*]' COLUMNS (sku TEXT PATH '$.sku', qty INT PATH '$.qty')) AS jt
GROUP BY jt.sku;
Verify: EXPLAIN ANALYZE on your most common queries confirms index usage.
For production systems, implement a dual-write pattern: writes go to both databases, reads shift gradually to PostgreSQL. [src3, src4, src8]
// Node.js dual-write middleware
const { MongoClient } = require('mongodb');
const { Pool } = require('pg');
async function dualWrite(collection, pgTable, document, pgTransform) {
const pgRow = pgTransform(document);
// PostgreSQL is source of truth
const pgResult = await pg.query(
`INSERT INTO ${pgTable} (${Object.keys(pgRow).join(', ')})
VALUES (${Object.keys(pgRow).map((_, i) => `$${i+1}`).join(', ')})
RETURNING id`,
Object.values(pgRow)
);
try {
await mongoDb.collection(collection).insertOne({
...document, _pg_id: pgResult.rows[0].id
});
} catch (err) {
console.error(`MongoDB shadow write failed: ${err.message}`);
}
return pgResult.rows[0];
}
Verify: Spot-check 100 records — data in both databases should match.
Run full data validation before decommissioning MongoDB. Compare record counts, checksums on critical fields, and test every query path. [src5, src8]
-- Count comparison
SELECT 'users' AS table_name, COUNT(*) FROM users
UNION ALL SELECT 'orders', COUNT(*) FROM orders;
-- Checksum critical fields
SELECT MD5(STRING_AGG(email || name || status, '' ORDER BY mongo_id)) AS checksum FROM users;
-- Verify no FK orphans
SELECT COUNT(*) FROM addresses a
LEFT JOIN users u ON a.user_id = u.id
WHERE u.id IS NULL; -- Should return 0
Verify: All counts match. FK orphan check returns 0. Application test suite passes against PostgreSQL.
# Input: MongoDB connection string + collection name
# Output: Data migrated to PostgreSQL table with JSONB hybrid schema
import json, time, psycopg2
from pymongo import MongoClient # pymongo==4.8.0
mongo = MongoClient("mongodb://localhost:27017")
pg_conn = psycopg2.connect("dbname=myapp user=postgres host=localhost")
pg_cur = pg_conn.cursor()
BATCH_SIZE = 5000
def migrate_collection(db_name, coll_name, pg_table, field_map):
coll = mongo[db_name][coll_name]
total = coll.estimated_document_count()
start = time.time()
batch = []
columns = [pg_col for pg_col, _ in field_map.values()] + ["metadata"]
placeholders = ", ".join(["%s"] * len(columns))
sql = f"INSERT INTO {pg_table} ({', '.join(columns)}) VALUES ({placeholders}) ON CONFLICT DO NOTHING"
for i, doc in enumerate(coll.find().batch_size(BATCH_SIZE), 1):
row = [transform(doc.get(field)) for field, (_, transform) in field_map.items()]
extra = {k: v for k, v in doc.items() if k not in set(field_map) | {"_id"}}
row.append(json.dumps(extra, default=str))
batch.append(tuple(row))
if len(batch) >= BATCH_SIZE:
pg_cur.executemany(sql, batch)
pg_conn.commit()
print(f" {i}/{total} ({i*100//total}%) — {i/(time.time()-start):.0f} docs/sec")
batch.clear()
if batch:
pg_cur.executemany(sql, batch)
pg_conn.commit()
print(f"Done: {total} docs in {time.time()-start:.1f}s")
migrate_collection("myapp", "users", "users", {
"email": ("email", lambda v: v),
"name": ("name", lambda v: v or ""),
"status": ("status", lambda v: v or "active"),
"createdAt": ("created_at", lambda v: v.isoformat() if v else None),
})
// Input: MongoDB collection with millions of documents
// Output: Data streamed into PostgreSQL using COPY protocol
const { MongoClient } = require('mongodb'); // [email protected]
const { Pool } = require('pg'); // [email protected]
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
const copyFrom = require('pg-copy-streams').from; // [email protected]
async function streamMigrate(dbName, collName, pgTable, transformDoc) {
const mongo = new MongoClient('mongodb://localhost:27017');
const pg = new Pool({ connectionString: 'postgresql://postgres:secret@localhost/myapp' });
await mongo.connect();
const collection = mongo.db(dbName).collection(collName);
let count = 0;
const pgClient = await pg.connect();
const copyStream = pgClient.query(
copyFrom(`COPY ${pgTable} FROM STDIN WITH (FORMAT csv, NULL '\\N')`)
);
const mongoStream = collection.find().stream();
const transformer = new Transform({
objectMode: true,
transform(doc, _enc, cb) {
count++;
const row = transformDoc(doc);
const csv = row.map(f => {
if (f == null) return '\\N';
const s = String(f);
return s.includes(',') || s.includes('"') || s.includes('\n')
? `"${s.replace(/"/g, '""')}"` : s;
}).join(',') + '\n';
cb(null, csv);
}
});
await pipeline(mongoStream, transformer, copyStream);
console.log(`Migrated ${count} rows into ${pgTable}`);
pgClient.release(); await pg.end(); await mongo.close();
}
streamMigrate('myapp', 'orders', 'orders', (doc) => [
doc._id.toString(), doc.userId?.toString(), doc.total || 0,
doc.status || 'pending', doc.createdAt?.toISOString(), JSON.stringify(doc.items)
]);
-- MongoDB: db.orders.find({status: "shipped", total: {$gte: 100}}).sort({createdAt: -1}).limit(20)
SELECT * FROM orders WHERE status = 'shipped' AND total >= 100 ORDER BY created_at DESC LIMIT 20;
-- MongoDB: db.orders.aggregate([{$group: {_id: "$status", count: {$sum: 1}, avg: {$avg: "$total"}}}])
SELECT status, COUNT(*) AS count, AVG(total) AS avg_total FROM orders GROUP BY status;
-- MongoDB: db.users.find({"address.city": "New York"})
-- Normalized:
SELECT u.* FROM users u JOIN addresses a ON a.user_id = u.id WHERE a.city = 'New York';
-- JSONB:
SELECT * FROM users WHERE metadata->>'city' = 'New York';
-- MongoDB: db.users.find({tags: {$in: ["premium", "vip"]}})
-- JSONB array:
SELECT * FROM users WHERE metadata->'tags' ?| ARRAY['premium', 'vip'];
-- MongoDB: $unwind + $group (average of nested array)
SELECT id, AVG((review->>'rating')::numeric) AS avg_rating
FROM products, jsonb_array_elements(reviews) AS review GROUP BY id;
-- ❌ BAD — stores entire MongoDB document as one JSONB blob
CREATE TABLE users (id SERIAL PRIMARY KEY, doc JSONB);
INSERT INTO users (doc) VALUES ('{"email":"[email protected]","name":"Alice","status":"active"}'::jsonb);
-- No constraints, no type safety, every query needs JSONB operators
-- ✅ GOOD — frequently queried fields are real columns with types and constraints
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
status TEXT DEFAULT 'active' CHECK (status IN ('active','inactive','suspended')),
created_at TIMESTAMPTZ NOT NULL,
metadata JSONB DEFAULT '{}' -- only variable fields here
);
-- ❌ BAD — ObjectIds make poor PostgreSQL primary keys
CREATE TABLE users (id VARCHAR(24) PRIMARY KEY, email TEXT);
-- 24-char string comparisons are slower than UUID, no built-in generation
-- ✅ GOOD — native UUID primary key + reference column for traceability
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
mongo_id VARCHAR(24), -- for cross-referencing during migration
email TEXT NOT NULL UNIQUE
);
CREATE INDEX idx_users_mongo_id ON users (mongo_id); -- drop post-migration
-- ❌ BAD — over-normalization of rarely-queried nested data
CREATE TABLE user_preferences (id UUID, user_id UUID, key TEXT, value TEXT);
CREATE TABLE user_notification_settings (id UUID, user_id UUID, channel TEXT, enabled BOOLEAN);
CREATE TABLE user_ui_settings (id UUID, user_id UUID, theme TEXT, language TEXT);
-- 4+ JOINs to reconstruct a single user profile
-- ✅ GOOD — one JSONB column for settings, no extra tables
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email TEXT NOT NULL UNIQUE,
preferences JSONB DEFAULT '{"notifications":{"email":true},"ui":{"theme":"light"}}'
);
-- Still queryable: SELECT * FROM users WHERE preferences->'ui'->>'theme' = 'dark';
# ❌ BAD — CSV strings lose structure and queryability
tags_str = ",".join(doc.get("tags", []))
cur.execute("INSERT INTO users (tags) VALUES (%s)", (tags_str,))
# ✅ GOOD — preserve array structure in JSONB
import json
tags = doc.get("tags", [])
cur.execute("INSERT INTO users (tags) VALUES (%s::jsonb)", (json.dumps(tags),))
# Query: SELECT * FROM users WHERE tags ? 'premium'
mongoexport outputs dates as {"$date": "2026-01-01T00:00:00Z"} — not a plain ISO string. Fix: Parse with val["$date"] before inserting into TIMESTAMPTZ columns. [src4]{"$oid": "65a..."} in extended JSON, not a plain string. Fix: Extract with doc["_id"]["$oid"] or use str(doc["_id"]) with PyMongo. [src6]$type aggregation to detect type variance per field. [src5]null. Fix: Use .get(field, default) in migration scripts and set DEFAULT values in PostgreSQL. [src7]ALTER TABLE addresses DISABLE TRIGGER ALL;), re-enable after. [src3]COPY protocol for 10x throughput. [src7]_id index: Every MongoDB collection has an index on _id. Fix: Ensure your PostgreSQL PRIMARY KEY is in place — it creates a B-tree index automatically. [src1]_doc JSONB by default, giving a false sense of completion. Fix: Always plan a second normalization pass after DMS full-load completes. [src3]# Check MongoDB collection stats before migration
mongosh --eval 'db.users.stats()' | grep -E 'count|size|storageSize|nindexes'
# Export MongoDB collection to JSON
mongoexport --db=myapp --collection=users --type=json --out=users.json
# Verify export line count
wc -l users.json
# Check PostgreSQL table after migration
psql -c "SELECT COUNT(*) AS row_count FROM users;"
psql -c "SELECT pg_size_pretty(pg_total_relation_size('users')) AS table_size;"
# Verify JSONB data is valid
psql -c "SELECT COUNT(*) FROM users WHERE metadata IS NOT NULL AND jsonb_typeof(metadata) = 'object';"
# Check index usage
psql -c "SELECT indexrelname, idx_scan, idx_tup_read FROM pg_stat_user_indexes WHERE schemaname = 'public' ORDER BY idx_scan DESC;"
# Compare record counts
mongosh --eval 'db.users.countDocuments()'
psql -c "SELECT COUNT(*) FROM users;"
# Detect orphaned foreign keys
psql -c "SELECT COUNT(*) FROM addresses a LEFT JOIN users u ON a.user_id = u.id WHERE u.id IS NULL;"
# Check for NULL in required fields post-migration
psql -c "SELECT COUNT(*) FROM users WHERE email IS NULL OR name IS NULL;"
| Component | Version | Status | Migration Notes |
|---|---|---|---|
| MongoDB 8.0 | Current | Supported | Full mongoexport support; queryable encryption not supported by DMS; AWS DMS 3.5.4+ |
| MongoDB 7.0 | Supported | AWS DMS 3.5.4+ | Standard migration path |
| MongoDB 6.0 | Supported | AWS DMS 3.5.2+ | Timeseries collections not supported by DMS |
| MongoDB 5.0 | Supported | AWS DMS 3.5.1+ | Live resharding not supported by DMS |
| MongoDB 4.x | Legacy | AWS DMS 3.4.5+ | Most documented migration path |
| PostgreSQL 18 | Current (Sep 2025) | All features | Async I/O (2–3x perf), UUID v7, enhanced RETURNING clause |
| PostgreSQL 17 | Supported | All features | JSON_TABLE() for analytics, improved VACUUM |
| PostgreSQL 16 | Supported | All features | Improved JSONB performance |
| PostgreSQL 14–15 | Supported | JSONB subscripting (PG 14+) | Use jsonb_set() for older versions |
| Use When | Don't Use When | Use Instead |
|---|---|---|
| Need ACID transactions across related documents | Schema is truly dynamic with no common fields | Keep MongoDB or use DynamoDB |
| Want SQL JOINs for complex reporting/analytics | Sub-millisecond reads on simple key-value lookups | Redis or MongoDB with read replicas |
| Regulatory requirements demand strong data integrity | Horizontal sharding across 10+ nodes is required | MongoDB Atlas, CockroachDB, or Citus |
| Team knows SQL better than MongoDB aggregation | Data is primarily geospatial with GeoJSON | MongoDB or PostGIS |
| PostgreSQL JSONB covers your schema flexibility needs | Write volume exceeds 100K inserts/sec sustained | MongoDB or TimescaleDB |
| Want a single database for relational + document data | Application is stable on MongoDB with no pain points | Don't migrate for migration's sake |
age stored as string in some docs, number in others). Always audit field types across a large sample before designing PostgreSQL columns._doc JSONB column — useful for initial migration but requires a second pass to normalize into relational columns.CHECK constraints or application-level validation.COPY protocol is 5–10x faster than individual INSERT statements for bulk loading. Always use COPY for large migrations.Decimal128 maps to PostgreSQL NUMERIC — verify precision is preserved, especially for financial data.JSON_TABLE() function significantly simplifies analytics queries on migrated JSONB data — consider using it for reporting pipelines.