Kafka (ingestion) → Flink (processing) → ClickHouse (storage) → Grafana (serving) is the most battle-tested open-source stack for high-throughput metrics.FINAL in queries| Component | Role | Technology Options | Scaling Strategy |
|---|---|---|---|
| Event Producers | Emit structured events/metrics from apps | OpenTelemetry SDK, StatsD, custom emitters | Horizontal — add producers independently |
| Message Bus | Durable, ordered event transport | Apache Kafka, Amazon MSK, Redpanda, Pulsar | Partition-based — add partitions + brokers |
| Schema Registry | Enforce event schema contracts | Confluent Schema Registry, AWS Glue | Single cluster — scales with broker count |
| Stream Processor | Transform, aggregate, enrich events in-flight | Apache Flink, Kafka Streams, Spark Structured Streaming | Task-parallelism — add TaskManagers/executors |
| Pre-Aggregation | Reduce cardinality before storage | Flink windowed aggregation, ClickHouse Materialized Views | Tied to processor or storage scaling |
| OLAP Storage | Columnar analytics store for fast queries | ClickHouse, Apache Druid, Apache Pinot, TimescaleDB | Shard-based — add shards for writes, replicas for reads |
| Object Storage | Raw event archive / data lake tier | S3, GCS, MinIO | Virtually unlimited — cost scales linearly |
| Batch Processor | Backfill, reprocess, train models on historical data | Apache Spark, dbt, Apache Airflow (orchestration) | Cluster auto-scaling — elastic resize |
| Query Engine | SQL interface for analysts and dashboards | ClickHouse native, Trino/Presto, Grafana direct | Read replicas or caching layer |
| Serving / Visualization | Dashboards, alerts, API endpoints | Grafana, Superset, custom REST APIs | Horizontal — stateless query proxies |
| Alerting | Threshold and anomaly detection on metrics | Grafana Alerting, Prometheus Alertmanager, PagerDuty | Stateless evaluators — scale horizontally |
| Dead-Letter Queue | Capture failed/malformed events for reprocessing | Kafka DLQ topic, S3 error bucket | Same as message bus scaling |
START: What is your event volume and latency requirement?
├── <10K events/sec AND minutes-latency OK?
│ ├── YES → Simple stack: producers → Kafka → batch consumer → PostgreSQL/TimescaleDB → Grafana
│ └── NO ↓
├── 10K-100K events/sec OR sub-second latency needed?
│ ├── YES → Standard streaming: Kafka → Flink (windowed aggregation) → ClickHouse → Grafana
│ │ (Kappa architecture — single stream processing path)
│ └── NO ↓
├── >100K events/sec OR complex event processing (joins, patterns)?
│ ├── YES → Full pipeline: Kafka → Flink (CEP + aggregation) → ClickHouse cluster (sharded)
│ │ Add pre-aggregation in Flink to reduce ClickHouse write amplification
│ └── NO ↓
├── >1M events/sec (hyperscale)?
│ ├── YES → Multi-tier: Kafka (tiered storage) → Flink (multi-stage DAG) → ClickHouse Cloud
│ │ Add sampling at ingestion, rollup tables, read replicas for queries
│ └── NO ↓
├── Need both real-time dashboards AND accurate historical backfill?
│ ├── YES → Lambda architecture: speed layer (Flink → ClickHouse) + batch layer (Spark → S3)
│ │ Merge results at query time. Higher operational cost but guarantees accuracy.
│ └── NO ↓
└── DEFAULT → Kappa architecture with Kafka + Flink + ClickHouse
Start with a well-defined event schema using Avro or Protobuf. Register it in a Schema Registry to enforce contracts between producers and consumers. Schema evolution (adding optional fields) is safe; removing or renaming fields requires a new schema version. [src2]
syntax = "proto3";
package analytics;
message MetricEvent {
string event_id = 1; // UUID for deduplication
string metric_name = 2; // e.g., "http_request_duration_ms"
double value = 3;
int64 timestamp_ms = 4; // Unix epoch milliseconds
map<string, string> tags = 5; // bounded-cardinality labels only
string source = 6; // producing service name
}
Verify: curl http://schema-registry:8081/subjects → should list metrics_event-value
Configure Kafka topics with appropriate partition counts and replication. Use min.insync.replicas=2 with acks=all for durability. Partition by a low-cardinality key (metric_name, service_name). [src1]
# Create metrics topic — 12 partitions, replication factor 3
kafka-topics.sh --create \
--bootstrap-server kafka:9092 \
--topic metrics-events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config min.insync.replicas=2
Verify: kafka-topics.sh --describe --topic metrics-events → 12 partitions, ISR=3
Deploy a Flink job that consumes from Kafka, applies windowed aggregation (1-minute tumbling windows), and writes results to ClickHouse. Enable checkpointing to S3/HDFS for fault tolerance. [src3]
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints/metrics-pipeline");
KafkaSource<MetricEvent> source = KafkaSource.<MetricEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("metrics-events")
.setGroupId("metrics-processor")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
events.keyBy(MetricEvent::getMetricName)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new MetricAggregator())
.addSink(new ClickHouseSink("jdbc:clickhouse://clickhouse:8123/metrics"));
Verify: Flink Web UI at http://flink-jobmanager:8081 → job shows "RUNNING" with checkpoints succeeding
Create a raw events table using MergeTree and materialized views for pre-aggregated rollups. Use ClickHouse's built-in Kafka engine for direct ingestion if Flink is not needed, or write from Flink via JDBC. [src1] [src5]
CREATE TABLE metrics_raw (
event_id String,
metric_name LowCardinality(String),
value Float64,
timestamp_ms DateTime64(3),
tags Map(LowCardinality(String), String),
source LowCardinality(String)
) ENGINE = ReplacingMergeTree(timestamp_ms)
PARTITION BY toYYYYMM(timestamp_ms)
ORDER BY (metric_name, source, timestamp_ms, event_id)
TTL timestamp_ms + INTERVAL 90 DAY;
Verify: SELECT count() FROM metrics_raw → should increase as events flow
Connect Grafana to ClickHouse using the official ClickHouse data source plugin. Create dashboards querying rollup tables (not raw) for sub-second response. [src5]
Verify: Grafana → Explore → run SELECT count() FROM metrics_1m WHERE ts > now() - INTERVAL 1 HOUR → returns data
Instrument every pipeline component. Alert on consumer lag, checkpoint failures, and ClickHouse merge queue depth. [src2] [src6]
# Check Kafka consumer group lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group metrics-processor
# ClickHouse merge health
clickhouse-client -q "SELECT table, count() as merges FROM system.merges GROUP BY table"
Verify: Kafka LAG < 1000; Flink checkpoints completing; ClickHouse merges progressing
# Input: Application metrics (HTTP request durations, counters)
# Output: Structured events published to Kafka topic
from confluent_kafka import Producer # confluent-kafka==2.6.1
import json, uuid, time
producer = Producer({
"bootstrap.servers": "kafka:9092",
"acks": "all",
"linger.ms": 5,
"compression.type": "zstd",
"enable.idempotence": True,
})
def emit_metric(name: str, value: float, tags: dict):
event = {
"event_id": str(uuid.uuid4()),
"metric_name": name,
"value": value,
"timestamp_ms": int(time.time() * 1000),
"tags": tags,
"source": "api-gateway",
}
producer.produce(
topic="metrics-events",
key=name.encode(),
value=json.dumps(event).encode(),
callback=lambda err, msg: err and print(f"DLQ: {err}"),
)
-- Input: Pre-aggregated 1-minute rollup table
-- Output: Dashboard-ready time series with percentiles
SELECT ts, metric_name, source,
quantileMerge(0.99)(val_p99) AS p99_latency,
sumMerge(val_sum) / countMerge(cnt) AS avg_latency,
countMerge(cnt) AS request_count
FROM metrics_1m
WHERE metric_name = 'http_request_duration_ms'
AND ts > now() - INTERVAL 24 HOUR
GROUP BY ts, metric_name, source
ORDER BY ts;
# BAD — user_id as a tag creates millions of unique time series
emit_metric("page_load_time_ms", 320.0, {
"user_id": "u-839201", # unbounded cardinality!
"session_id": "s-28d7f3a", # unbounded cardinality!
"page": "/dashboard",
})
# GOOD — only bounded-cardinality tags in metrics
emit_metric("page_load_time_ms", 320.0, {
"page": "/dashboard", # bounded: ~100 pages
"region": "us-east-1", # bounded: ~20 regions
"device_type": "mobile", # bounded: 3 options
})
# High-cardinality data goes to a separate events/logs table
# BAD — producer fires and forgets without flow control
for event in infinite_event_stream():
producer.produce("metrics-events", value=serialize(event))
# No flush, no error callback — BufferError → data loss
# GOOD — respect producer buffer limits and handle errors
for event in infinite_event_stream():
try:
producer.produce("metrics-events", value=serialize(event),
callback=delivery_report)
producer.poll(0)
except BufferError:
producer.poll(1.0) # backpressure: wait for buffer space
producer.produce("metrics-events", value=serialize(event),
callback=delivery_report)
-- BAD — scanning months of raw data for a dashboard panel
SELECT toStartOfMinute(timestamp_ms) AS ts, avg(value)
FROM metrics_raw
WHERE metric_name = 'cpu_usage'
AND timestamp_ms > now() - INTERVAL 30 DAY -- billions of rows
GROUP BY ts ORDER BY ts;
-- Result: 30+ second query
-- GOOD — query the 1-minute materialized view rollup
SELECT ts, avgMerge(val_avg) AS avg_cpu
FROM metrics_1m
WHERE metric_name = 'cpu_usage'
AND ts > now() - INTERVAL 30 DAY -- only rollup rows
GROUP BY ts ORDER BY ts;
-- Result: sub-second query, 100-1000x fewer rows scanned
auto.offset.reset=latest for non-critical metrics. [src2]BACKWARD mode. [src7]withIdleness(Duration.ofMinutes(1)) on the watermark strategy. [src3]# Monitor consumer group lag (key health metric)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group metrics-processor
# Flink job status and checkpoint health
curl -s http://flink-jobmanager:8081/jobs | jq '.jobs[] | {id, status}'
# ClickHouse insert rate and merge health
clickhouse-client -q "SELECT database, table, elapsed, progress FROM system.merges"
# ClickHouse table sizes and part counts
clickhouse-client -q "SELECT table, formatReadableSize(sum(bytes_on_disk)) AS size, count() AS parts FROM system.parts WHERE active GROUP BY table ORDER BY sum(bytes_on_disk) DESC"
| Technology | Current Version | Key Changes | Migration Notes |
|---|---|---|---|
| Apache Kafka | 4.0 (2025) | ZooKeeper removed — KRaft-only | Migrate ZK to KRaft before upgrading past 3.x |
| Apache Flink | 2.0 (2024) | Unified DataStream + Table API | 1.x DataStream still works; new Table API preferred |
| ClickHouse | 24.x (2024) | Improved Kafka engine, new JSON type | Backward-compatible; new features opt-in |
| Kafka Streams | 3.8 (2024) | Improved state store backends | No breaking changes from 3.x |
| Redpanda | 24.x (2024) | Kafka API-compatible, no JVM | Drop-in Kafka replacement |
| Grafana | 11.x (2024) | ClickHouse plugin v4, unified alerting | Dashboard JSON forward-compatible |
| Use When | Don't Use When | Use Instead |
|---|---|---|
| >10K events/sec needing sub-second dashboards | <1K events/sec with daily reporting | Managed APM (Datadog, New Relic) or PostgreSQL |
| Custom aggregation logic (percentiles, funnels, cohorts) | Standard infra metrics (CPU, memory, disk) | Prometheus + Grafana |
| Multi-source data unified in one store | Single-source application logs only | ELK stack or Grafana Loki |
| Need to replay/reprocess historical data | One-time data migration or ETL | Apache Spark batch job or dbt |
| Cost-sensitive at scale (>100TB/month) | Small team without data engineering | SaaS observability platform |
FINAL modifier for guaranteed correctness at the cost of query speed.zstd compression on both.