mosquitto_pub -h broker.example.com -t sensors/temp -m '{"device":"d1","value":23.5,"ts":1708700000}'| Component | Role | Technology Options | Scaling Strategy |
|---|---|---|---|
| Device Gateway | Accepts device connections, authenticates, terminates TLS | AWS IoT Core, Azure IoT Hub, EMQX, HiveMQ, Mosquitto | Horizontal broker clustering; 1M+ connections per cluster |
| MQTT Broker | Pub/sub message routing with QoS levels | EMQX, HiveMQ, VerneMQ, Mosquitto, NanoMQ | Cluster with shared subscriptions; partition by topic namespace |
| Protocol Adapter | Translates CoAP/HTTP/AMQP to internal format | AWS IoT Rules, Azure IoT Edge, custom bridge | Stateless; scale with load balancer |
| Stream Buffer | Decouples ingestion from processing; absorbs bursts | Apache Kafka, Amazon Kinesis, Azure Event Hubs, Redpanda | Add partitions; increase retention for replay |
| Schema Registry | Validates and evolves message schemas | Confluent Schema Registry, AWS Glue Schema Registry | Stateless reads; single-leader writes |
| Stream Processor | Transforms, enriches, aggregates in real time | Apache Flink, Kafka Streams, AWS Lambda, Azure Stream Analytics | Parallel consumers per partition |
| Rules Engine | Routes messages based on content/topic patterns | AWS IoT Rules, Node-RED, custom filter layer | Stateless; scale horizontally |
| Time-Series DB | Stores telemetry with time-based partitioning | TimescaleDB, InfluxDB, Amazon Timestream, QuestDB | Hypertable chunking (TimescaleDB), sharding (InfluxDB) |
| Object Storage | Long-term raw data archive | S3, Azure Blob, GCS | Lifecycle policies; tiered storage classes |
| Edge Processor | Pre-filters and aggregates at the edge | AWS IoT Greengrass, Azure IoT Edge, Telegraf | Deploy per site/gateway |
| Monitoring Stack | Pipeline health, throughput, latency dashboards | Grafana + Prometheus, CloudWatch, Datadog | Federated Prometheus for multi-region |
| Device Registry | Tracks device identity, firmware, status | AWS IoT Device Defender, Azure DPS, custom DB | Read replicas; eventual consistency acceptable |
START: Choose your ingestion protocol
├── Devices are battery-powered / bandwidth-constrained?
│ ├── YES → Use MQTT 5.0 (2-byte header, minimal overhead)
│ │ ├── Need QoS 2 (exactly-once delivery)?
│ │ │ ├── YES → MQTT QoS 2 (higher latency, 4-packet handshake)
│ │ │ └── NO → MQTT QoS 1 (at-least-once, best throughput)
│ │ └── Continue to buffer selection below
│ └── NO ↓
├── Devices send < 1 msg/min and are on reliable networks?
│ ├── YES → HTTP POST with batch payloads is acceptable
│ └── NO ↓
├── Need complex routing (exchanges, queues, priorities)?
│ ├── YES → AMQP 1.0 (RabbitMQ) — enterprise messaging features
│ └── NO ↓
└── DEFAULT → MQTT 5.0 (industry standard for IoT)
BUFFER SELECTION:
├── > 100K messages/sec sustained throughput?
│ ├── YES → Apache Kafka / Redpanda (partitioned log, replay)
│ └── NO ↓
├── Prefer managed service, < 100K msg/sec?
│ ├── YES → Amazon Kinesis / Azure Event Hubs
│ └── NO ↓
└── DEFAULT → Kafka (most flexible, largest ecosystem)
STORAGE SELECTION:
├── Need SQL queries on time-series data?
│ ├── YES → TimescaleDB (full PostgreSQL compatibility)
│ └── NO ↓
├── Need sub-second query latency on recent data?
│ ├── YES → InfluxDB or QuestDB
│ └── NO ↓
├── AWS-native, serverless preferred?
│ ├── YES → Amazon Timestream
│ └── NO ↓
└── DEFAULT → TimescaleDB (SQL + compression + ecosystem)
Deploy a clustered MQTT broker as the device-facing ingestion endpoint. Use MQTT 5.0 for shared subscriptions and topic aliases. Configure TLS with mutual authentication (mTLS) for production. [src3]
# Deploy EMQX cluster via Docker
docker run -d --name emqx \
-p 1883:1883 \
-p 8883:8883 \
-p 8083:8083 \
-p 18083:18083 \
-e EMQX_LISTENERS__SSL__DEFAULT__SSL_OPTIONS__CERTFILE=/certs/server.pem \
-e EMQX_LISTENERS__SSL__DEFAULT__SSL_OPTIONS__KEYFILE=/certs/server.key \
emqx/emqx:5.5.1
Verify: mosquitto_pub -h localhost -p 1883 -t test/hello -m "ping" and subscribe on test/hello to confirm receipt.
Establish a hierarchical, lowercase topic structure before connecting any devices. This cannot easily be changed later. [src3]
# Topic structure: {org}/{site}/{device_type}/{device_id}/{measurement}
acme/factory-01/temperature/sensor-4a3b/reading
acme/factory-01/vibration/motor-7c2d/reading
acme/factory-01/+/+/alert # Wildcard subscription for all alerts
Verify: Subscribe to acme/factory-01/# and confirm messages arrive with correct topic structure.
Connect the MQTT broker to Apache Kafka using a Kafka connector or built-in bridge. This decouples device ingestion from downstream processing. [src1] [src7]
# Using Confluent MQTT Source Connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" -d '{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"mqtt.server.uri": "tcp://emqx:1883",
"mqtt.topics": "acme/+/+/+/reading",
"kafka.topic": "iot-telemetry-raw",
"tasks.max": "4"
}
}'
Verify: kafka-console-consumer --bootstrap-server localhost:9092 --topic iot-telemetry-raw --from-beginning shows MQTT messages.
Implement a stream processor that validates schemas, normalizes timestamps to UTC, enriches with device metadata, and routes to sinks. [src1]
# Faust stream processor (Python)
import faust, json
app = faust.App('iot-processor', broker='kafka://localhost:9092')
raw_topic = app.topic('iot-telemetry-raw', value_type=bytes)
clean_topic = app.topic('iot-telemetry-clean', value_type=bytes)
@app.agent(raw_topic)
async def process_telemetry(stream):
async for event in stream:
data = json.loads(event)
if not all(k in data for k in ('device_id', 'value', 'ts')):
continue # Drop malformed
data['ts_utc'] = normalize_ts(data['ts'])
await clean_topic.send(value=json.dumps(data).encode())
Verify: Check iot-telemetry-clean topic for enriched messages with ts_utc field.
Configure a Kafka sink to write processed telemetry into your time-series database with compression and retention policies. [src4]
-- TimescaleDB: Create hypertable for IoT telemetry
CREATE TABLE telemetry (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
location TEXT,
metric TEXT NOT NULL,
value DOUBLE PRECISION NOT NULL
);
SELECT create_hypertable('telemetry', 'time');
SELECT add_compression_policy('telemetry', INTERVAL '7 days');
SELECT add_retention_policy('telemetry', INTERVAL '1 year');
Verify: SELECT count(*) FROM telemetry WHERE time > now() - interval '5 minutes'; should show increasing rows.
Set up end-to-end pipeline observability covering broker metrics, Kafka consumer lag, processing throughput, and database write latency. [src3]
# Prometheus scrape config
scrape_configs:
- job_name: 'emqx'
static_configs:
- targets: ['emqx:18083']
metrics_path: '/api/v5/prometheus/stats'
- job_name: 'kafka'
static_configs:
- targets: ['kafka:9308']
Verify: Open Grafana at http://localhost:3000 and confirm dashboards show broker connections and Kafka lag.
# Input: Sensor readings from GPIO/serial
# Output: JSON messages published to MQTT broker
import paho.mqtt.client as mqtt # paho-mqtt==1.6.1
import json, time, ssl
BROKER = "broker.example.com"
PORT = 8883 # TLS
TOPIC = "acme/site-01/temperature/sensor-001/reading"
client = mqtt.Client(protocol=mqtt.MQTTv5)
client.tls_set(
ca_certs="/certs/ca.pem",
certfile="/certs/device.pem",
keyfile="/certs/device.key",
tls_version=ssl.PROTOCOL_TLS_CLIENT
)
client.connect(BROKER, PORT)
client.loop_start()
while True:
payload = {
"device_id": "sensor-001",
"metric": "temperature_c",
"value": read_sensor(),
"ts": int(time.time() * 1000)
}
client.publish(TOPIC, json.dumps(payload), qos=1)
time.sleep(10)
# Input: List of telemetry dicts from stream processor
# Output: Batch-inserted rows in TimescaleDB hypertable
import psycopg2 # psycopg2-binary==2.9.9
from psycopg2.extras import execute_values
conn = psycopg2.connect(
host="timescale.example.com", dbname="iot",
user="writer", password="secret", sslmode="require"
)
def insert_batch(records):
sql = """INSERT INTO telemetry (time, device_id, location, metric, value)
VALUES %s ON CONFLICT DO NOTHING"""
with conn.cursor() as cur:
execute_values(cur, sql, records, page_size=1000)
conn.commit()
// Input: MQTT messages from broker
// Output: Messages forwarded to Kafka topic
const mqtt = require("mqtt"); // [email protected]
const { Kafka } = require("kafkajs"); // [email protected]
const fs = require("fs");
const mqttClient = mqtt.connect("mqtts://broker.example.com:8883", {
ca: fs.readFileSync("/certs/ca.pem"),
protocolVersion: 5,
});
const kafka = new Kafka({ brokers: ["kafka:9092"] });
const producer = kafka.producer();
async function start() {
await producer.connect();
mqttClient.subscribe("acme/+/+/+/reading", { qos: 1 });
mqttClient.on("message", async (topic, message) => {
const key = topic.split("/")[3]; // device_id
await producer.send({
topic: "iot-telemetry-raw",
messages: [{ key, value: message.toString() }],
});
});
}
start().catch(console.error);
# BAD -- No buffer layer; DB outage = data loss, connection exhaustion at scale
def on_sensor_read(value):
conn = psycopg2.connect(host="db.example.com", dbname="iot")
cur = conn.cursor()
cur.execute("INSERT INTO telemetry VALUES (now(), %s)", (value,))
conn.commit()
conn.close() # Connection per write = catastrophic at 10K devices
# GOOD -- MQTT broker absorbs bursts; Kafka buffers for processing
def on_sensor_read(value):
client.publish("sensors/temp", json.dumps({"value": value}), qos=1)
# Kafka consumer writes to DB in batches, handles backpressure
# BAD -- No topic hierarchy; impossible to subscribe selectively
client.publish("all-data", json.dumps({"device": "s1", "type": "temp", "value": 23}))
# Every consumer gets every message; no filtering at broker level
# GOOD -- Granular subscriptions; broker-level filtering
client.publish("acme/factory-01/temperature/sensor-001/reading", payload)
# Subscribe to specific: "acme/factory-01/temperature/+/reading"
# Subscribe to all alerts: "acme/+/+/+/alert"
# BAD -- Wastes resources; high latency; lock contention at scale
while True:
rows = db.query("SELECT * FROM telemetry WHERE processed = false LIMIT 100")
for row in rows:
process(row)
db.execute("UPDATE telemetry SET processed = true WHERE id = %s", row.id)
time.sleep(1)
# GOOD -- Push-based; partitioned parallelism; no database polling
@app.agent(telemetry_topic)
async def process(stream):
async for batch in stream.take(100, within=5.0):
results = [transform(msg) for msg in batch]
await sink_to_db(results)
-- BAD -- Unbounded table growth; queries slow within months
CREATE TABLE telemetry (time TIMESTAMPTZ, device_id TEXT, value FLOAT);
-- No partitioning, no compression, no retention = storage bomb
-- GOOD -- Automatic chunking, compression, and cleanup
SELECT create_hypertable('telemetry', 'time');
SELECT add_compression_policy('telemetry', INTERVAL '7 days');
SELECT add_retention_policy('telemetry', INTERVAL '1 year');
-- 10-20x compression ratio; old data automatically dropped
device_id or device_type; use at least num_consumers * 2 partitions. [src7]# Check MQTT broker connected clients
mosquitto_sub -v -h broker.example.com -t '$SYS/broker/clients/connected'
# Check Kafka consumer group lag (detect bottlenecks)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group iot-processor --describe
# Verify Kafka topic throughput
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka:9092 --topic iot-telemetry-raw --time -1
# Check TimescaleDB chunk status
psql -c "SELECT hypertable_name, chunk_name, range_start, range_end, is_compressed
FROM timescaledb_information.chunks
WHERE hypertable_name = 'telemetry' ORDER BY range_start DESC LIMIT 10;"
# Check InfluxDB write health
curl -s http://localhost:8086/health | jq .
| Version | Status | Breaking Changes | Migration Notes |
|---|---|---|---|
| MQTT 5.0 | Current (2019+) | Shared subscriptions, user properties, topic aliases | From 3.1.1: update client libs; no wire-level breaks |
| MQTT 3.1.1 | Widely deployed | --- | Still supported; lacks shared subscriptions |
| Kafka 3.x (KRaft) | Current (2024+) | ZooKeeper removed | Rolling upgrade; use kafka-metadata.sh migration tool |
| Kafka 2.8-2.x | Legacy | --- | Upgrade to 3.x for KRaft mode |
| TimescaleDB 2.x | Current | Continuous aggregates v2 | From 1.x: use pre/post restore functions |
| InfluxDB 3.x | Current (2024+) | New storage engine (Arrow/DataFusion) | From 2.x: data migration required; API changes |
| InfluxDB 2.x | Maintained | Flux deprecated in 3.x | Use InfluxQL for forward compatibility |
| Use When | Don't Use When | Use Instead |
|---|---|---|
| Thousands of devices sending periodic telemetry (temp, pressure, GPS) | Fewer than 10 devices sending data once per hour | Direct HTTP POST to REST API + PostgreSQL |
| You need sub-second ingestion latency for real-time dashboards | Data freshness of minutes/hours is acceptable | Batch file upload (CSV/Parquet) to object storage + scheduled ETL |
| Devices are bandwidth-constrained (cellular, LoRa, satellite) | All devices on reliable LAN/Wi-Fi with unlimited bandwidth | gRPC streaming or WebSocket connections |
| You need message replay for reprocessing or debugging | Messages are fire-and-forget with no replay requirement | Simple pub/sub (Redis Streams, Amazon SNS) |
| Multi-region deployment with edge processing requirements | Single-site deployment with local network only | Local MQTT broker + direct DB writes |
enable.idempotence=true on producers and isolation.level=read_committed on consumers; misconfiguration silently falls back to at-least-once