How to Design an Analytics and Metrics Pipeline
How do I design an analytics and metrics pipeline?
TL;DR
- Bottom line: An analytics/metrics pipeline consists of four layers — ingestion (collect events), processing (transform/aggregate), storage (OLAP-optimized), and serving (query/alert) — connected by a durable message bus like Kafka.
- Key tool/command:
Kafka (ingestion) → Flink (processing) → ClickHouse (storage) → Grafana (serving)is the most battle-tested open-source stack for high-throughput metrics. - Watch out for: Unbounded cardinality in metric labels — a single high-cardinality tag (like user_id) can multiply storage costs 100x and destroy query performance.
- Works with: Any cloud or on-premises environment; Kafka 3.x-4.x, Flink 1.18-2.x, ClickHouse 23.x-24.x, or managed equivalents (Confluent, Amazon MSK, Kinesis, BigQuery).
Constraints
- Never ingest raw unbounded-cardinality labels (user IDs, session IDs) into a metrics time-series store — aggregate or hash them first
- Always deploy a dead-letter queue (DLQ) for malformed or failed events — silent drops create invisible data loss that corrupts dashboards
- Kafka topic partition count cannot be decreased — over-partitioning wastes resources; under-partitioning limits parallelism. Plan partitions = 2x expected peak consumer parallelism
- ClickHouse MergeTree deduplication is eventual (happens during background merges) — design write paths to be idempotent or use
FINALin queries - Stream processing checkpoints (Flink savepoints, Kafka consumer offsets) must be persisted to durable storage (S3, HDFS) — in-memory state is lost on failure
Quick Reference
| Component | Role | Technology Options | Scaling Strategy |
|---|---|---|---|
| Event Producers | Emit structured events/metrics from apps | OpenTelemetry SDK, StatsD, custom emitters | Horizontal — add producers independently |
| Message Bus | Durable, ordered event transport | Apache Kafka, Amazon MSK, Redpanda, Pulsar | Partition-based — add partitions + brokers |
| Schema Registry | Enforce event schema contracts | Confluent Schema Registry, AWS Glue | Single cluster — scales with broker count |
| Stream Processor | Transform, aggregate, enrich events in-flight | Apache Flink, Kafka Streams, Spark Structured Streaming | Task-parallelism — add TaskManagers/executors |
| Pre-Aggregation | Reduce cardinality before storage | Flink windowed aggregation, ClickHouse Materialized Views | Tied to processor or storage scaling |
| OLAP Storage | Columnar analytics store for fast queries | ClickHouse, Apache Druid, Apache Pinot, TimescaleDB | Shard-based — add shards for writes, replicas for reads |
| Object Storage | Raw event archive / data lake tier | S3, GCS, MinIO | Virtually unlimited — cost scales linearly |
| Batch Processor | Backfill, reprocess, train models on historical data | Apache Spark, dbt, Apache Airflow (orchestration) | Cluster auto-scaling — elastic resize |
| Query Engine | SQL interface for analysts and dashboards | ClickHouse native, Trino/Presto, Grafana direct | Read replicas or caching layer |
| Serving / Visualization | Dashboards, alerts, API endpoints | Grafana, Superset, custom REST APIs | Horizontal — stateless query proxies |
| Alerting | Threshold and anomaly detection on metrics | Grafana Alerting, Prometheus Alertmanager, PagerDuty | Stateless evaluators — scale horizontally |
| Dead-Letter Queue | Capture failed/malformed events for reprocessing | Kafka DLQ topic, S3 error bucket | Same as message bus scaling |
Decision Tree
START: What is your event volume and latency requirement?
├── <10K events/sec AND minutes-latency OK?
│ ├── YES → Simple stack: producers → Kafka → batch consumer → PostgreSQL/TimescaleDB → Grafana
│ └── NO ↓
├── 10K-100K events/sec OR sub-second latency needed?
│ ├── YES → Standard streaming: Kafka → Flink (windowed aggregation) → ClickHouse → Grafana
│ │ (Kappa architecture — single stream processing path)
│ └── NO ↓
├── >100K events/sec OR complex event processing (joins, patterns)?
│ ├── YES → Full pipeline: Kafka → Flink (CEP + aggregation) → ClickHouse cluster (sharded)
│ │ Add pre-aggregation in Flink to reduce ClickHouse write amplification
│ └── NO ↓
├── >1M events/sec (hyperscale)?
│ ├── YES → Multi-tier: Kafka (tiered storage) → Flink (multi-stage DAG) → ClickHouse Cloud
│ │ Add sampling at ingestion, rollup tables, read replicas for queries
│ └── NO ↓
├── Need both real-time dashboards AND accurate historical backfill?
│ ├── YES → Lambda architecture: speed layer (Flink → ClickHouse) + batch layer (Spark → S3)
│ │ Merge results at query time. Higher operational cost but guarantees accuracy.
│ └── NO ↓
└── DEFAULT → Kappa architecture with Kafka + Flink + ClickHouse
Step-by-Step Guide
1. Define event schema and set up Schema Registry
Start with a well-defined event schema using Avro or Protobuf. Register it in a Schema Registry to enforce contracts between producers and consumers. Schema evolution (adding optional fields) is safe; removing or renaming fields requires a new schema version. [src2]
syntax = "proto3";
package analytics;
message MetricEvent {
string event_id = 1; // UUID for deduplication
string metric_name = 2; // e.g., "http_request_duration_ms"
double value = 3;
int64 timestamp_ms = 4; // Unix epoch milliseconds
map<string, string> tags = 5; // bounded-cardinality labels only
string source = 6; // producing service name
}
Verify: curl http://schema-registry:8081/subjects → should list metrics_event-value
2. Deploy Kafka as the central message bus
Configure Kafka topics with appropriate partition counts and replication. Use min.insync.replicas=2 with acks=all for durability. Partition by a low-cardinality key (metric_name, service_name). [src1]
# Create metrics topic — 12 partitions, replication factor 3
kafka-topics.sh --create \
--bootstrap-server kafka:9092 \
--topic metrics-events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config min.insync.replicas=2
Verify: kafka-topics.sh --describe --topic metrics-events → 12 partitions, ISR=3
3. Implement stream processing with Flink
Deploy a Flink job that consumes from Kafka, applies windowed aggregation (1-minute tumbling windows), and writes results to ClickHouse. Enable checkpointing to S3/HDFS for fault tolerance. [src3]
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints/metrics-pipeline");
KafkaSource<MetricEvent> source = KafkaSource.<MetricEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("metrics-events")
.setGroupId("metrics-processor")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
events.keyBy(MetricEvent::getMetricName)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new MetricAggregator())
.addSink(new ClickHouseSink("jdbc:clickhouse://clickhouse:8123/metrics"));
Verify: Flink Web UI at http://flink-jobmanager:8081 → job shows "RUNNING" with checkpoints succeeding
4. Configure ClickHouse storage with rollup tables
Create a raw events table using MergeTree and materialized views for pre-aggregated rollups. Use ClickHouse's built-in Kafka engine for direct ingestion if Flink is not needed, or write from Flink via JDBC. [src1] [src5]
CREATE TABLE metrics_raw (
event_id String,
metric_name LowCardinality(String),
value Float64,
timestamp_ms DateTime64(3),
tags Map(LowCardinality(String), String),
source LowCardinality(String)
) ENGINE = ReplacingMergeTree(timestamp_ms)
PARTITION BY toYYYYMM(timestamp_ms)
ORDER BY (metric_name, source, timestamp_ms, event_id)
TTL timestamp_ms + INTERVAL 90 DAY;
Verify: SELECT count() FROM metrics_raw → should increase as events flow
5. Set up serving layer with Grafana
Connect Grafana to ClickHouse using the official ClickHouse data source plugin. Create dashboards querying rollup tables (not raw) for sub-second response. [src5]
Verify: Grafana → Explore → run SELECT count() FROM metrics_1m WHERE ts > now() - INTERVAL 1 HOUR → returns data
6. Deploy monitoring for the pipeline itself
Instrument every pipeline component. Alert on consumer lag, checkpoint failures, and ClickHouse merge queue depth. [src2] [src6]
# Check Kafka consumer group lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group metrics-processor
# ClickHouse merge health
clickhouse-client -q "SELECT table, count() as merges FROM system.merges GROUP BY table"
Verify: Kafka LAG < 1000; Flink checkpoints completing; ClickHouse merges progressing
Code Examples
Python: Kafka Producer with Bounded-Cardinality Tags
# Input: Application metrics (HTTP request durations, counters)
# Output: Structured events published to Kafka topic
from confluent_kafka import Producer # confluent-kafka==2.6.1
import json, uuid, time
producer = Producer({
"bootstrap.servers": "kafka:9092",
"acks": "all",
"linger.ms": 5,
"compression.type": "zstd",
"enable.idempotence": True,
})
def emit_metric(name: str, value: float, tags: dict):
event = {
"event_id": str(uuid.uuid4()),
"metric_name": name,
"value": value,
"timestamp_ms": int(time.time() * 1000),
"tags": tags,
"source": "api-gateway",
}
producer.produce(
topic="metrics-events",
key=name.encode(),
value=json.dumps(event).encode(),
callback=lambda err, msg: err and print(f"DLQ: {err}"),
)
SQL: ClickHouse Analytical Queries on Rollup Tables
-- Input: Pre-aggregated 1-minute rollup table
-- Output: Dashboard-ready time series with percentiles
SELECT ts, metric_name, source,
quantileMerge(0.99)(val_p99) AS p99_latency,
sumMerge(val_sum) / countMerge(cnt) AS avg_latency,
countMerge(cnt) AS request_count
FROM metrics_1m
WHERE metric_name = 'http_request_duration_ms'
AND ts > now() - INTERVAL 24 HOUR
GROUP BY ts, metric_name, source
ORDER BY ts;
Anti-Patterns
Wrong: High-cardinality fields as metric labels
# BAD — user_id as a tag creates millions of unique time series
emit_metric("page_load_time_ms", 320.0, {
"user_id": "u-839201", # unbounded cardinality!
"session_id": "s-28d7f3a", # unbounded cardinality!
"page": "/dashboard",
})
Correct: Bounded-cardinality tags only
# GOOD — only bounded-cardinality tags in metrics
emit_metric("page_load_time_ms", 320.0, {
"page": "/dashboard", # bounded: ~100 pages
"region": "us-east-1", # bounded: ~20 regions
"device_type": "mobile", # bounded: 3 options
})
# High-cardinality data goes to a separate events/logs table
Wrong: No backpressure handling
# BAD — producer fires and forgets without flow control
for event in infinite_event_stream():
producer.produce("metrics-events", value=serialize(event))
# No flush, no error callback — BufferError → data loss
Correct: Backpressure with bounded buffers
# GOOD — respect producer buffer limits and handle errors
for event in infinite_event_stream():
try:
producer.produce("metrics-events", value=serialize(event),
callback=delivery_report)
producer.poll(0)
except BufferError:
producer.poll(1.0) # backpressure: wait for buffer space
producer.produce("metrics-events", value=serialize(event),
callback=delivery_report)
Wrong: Querying raw table for dashboards
-- BAD — scanning months of raw data for a dashboard panel
SELECT toStartOfMinute(timestamp_ms) AS ts, avg(value)
FROM metrics_raw
WHERE metric_name = 'cpu_usage'
AND timestamp_ms > now() - INTERVAL 30 DAY -- billions of rows
GROUP BY ts ORDER BY ts;
-- Result: 30+ second query
Correct: Query pre-aggregated rollup tables
-- GOOD — query the 1-minute materialized view rollup
SELECT ts, avgMerge(val_avg) AS avg_cpu
FROM metrics_1m
WHERE metric_name = 'cpu_usage'
AND ts > now() - INTERVAL 30 DAY -- only rollup rows
GROUP BY ts ORDER BY ts;
-- Result: sub-second query, 100-1000x fewer rows scanned
Common Pitfalls
- Consumer lag spiraling: Consumers fall behind producers, lag grows unbounded. Fix: increase partition count and consumer instances; enable
auto.offset.reset=latestfor non-critical metrics. [src2] - Schema evolution breaking consumers: Adding a required field breaks all downstream consumers. Fix: always use backward-compatible evolution — add optional fields only. Use Schema Registry
BACKWARDmode. [src7] - ClickHouse merge pressure: Too many small inserts cause thousands of parts, triggering "Too many parts" errors. Fix: batch inserts in chunks of 10K-100K rows; use Buffer engine to absorb bursts. [src1]
- Missing watermarks in Flink: Without watermarks, event-time windows never close on idle partitions. Fix: configure
withIdleness(Duration.ofMinutes(1))on the watermark strategy. [src3] - No data retention policy: Raw events accumulate forever, storage costs grow unbounded. Fix: set TTL on ClickHouse tables and Kafka topic retention. [src1]
- Alerting on raw metrics: Querying raw tables for alerts adds unpredictable latency. Fix: alerts should query rollup tables or use dedicated alerting pipeline. [src6]
- Single Kafka cluster for all environments: Dev/staging/prod sharing leads to resource contention and data leaks. Fix: separate Kafka clusters per environment. [src2]
Diagnostic Commands
# Monitor consumer group lag (key health metric)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group metrics-processor
# Flink job status and checkpoint health
curl -s http://flink-jobmanager:8081/jobs | jq '.jobs[] | {id, status}'
# ClickHouse insert rate and merge health
clickhouse-client -q "SELECT database, table, elapsed, progress FROM system.merges"
# ClickHouse table sizes and part counts
clickhouse-client -q "SELECT table, formatReadableSize(sum(bytes_on_disk)) AS size, count() AS parts FROM system.parts WHERE active GROUP BY table ORDER BY sum(bytes_on_disk) DESC"
Version History & Compatibility
| Technology | Current Version | Key Changes | Migration Notes |
|---|---|---|---|
| Apache Kafka | 4.0 (2025) | ZooKeeper removed — KRaft-only | Migrate ZK to KRaft before upgrading past 3.x |
| Apache Flink | 2.0 (2024) | Unified DataStream + Table API | 1.x DataStream still works; new Table API preferred |
| ClickHouse | 24.x (2024) | Improved Kafka engine, new JSON type | Backward-compatible; new features opt-in |
| Kafka Streams | 3.8 (2024) | Improved state store backends | No breaking changes from 3.x |
| Redpanda | 24.x (2024) | Kafka API-compatible, no JVM | Drop-in Kafka replacement |
| Grafana | 11.x (2024) | ClickHouse plugin v4, unified alerting | Dashboard JSON forward-compatible |
When to Use / When Not to Use
| Use When | Don't Use When | Use Instead |
|---|---|---|
| >10K events/sec needing sub-second dashboards | <1K events/sec with daily reporting | Managed APM (Datadog, New Relic) or PostgreSQL |
| Custom aggregation logic (percentiles, funnels, cohorts) | Standard infra metrics (CPU, memory, disk) | Prometheus + Grafana |
| Multi-source data unified in one store | Single-source application logs only | ELK stack or Grafana Loki |
| Need to replay/reprocess historical data | One-time data migration or ETL | Apache Spark batch job or dbt |
| Cost-sensitive at scale (>100TB/month) | Small team without data engineering | SaaS observability platform |
Important Caveats
- Kafka 4.0 removed ZooKeeper entirely — all new deployments must use KRaft mode. Existing clusters must migrate before upgrading past 3.x.
- ClickHouse's ReplacingMergeTree and AggregatingMergeTree perform deduplication/aggregation during background merges, not at insert time — use the
FINALmodifier for guaranteed correctness at the cost of query speed. - Flink exactly-once semantics require transactional sinks — ClickHouse lacks XA transactions, so use idempotent writes (dedup by event_id) instead.
- Lambda architecture (separate batch + stream) doubles operational complexity — prefer Kappa (single stream) unless you need batch reprocessing accuracy.
- At hyperscale (>1M events/sec), network bandwidth between Kafka and ClickHouse often becomes the bottleneck — enable
zstdcompression on both.