How to Design an Analytics and Metrics Pipeline

Type: Software Reference Confidence: 0.91 Sources: 7 Verified: 2026-02-23 Freshness: quarterly

TL;DR

Constraints

Quick Reference

ComponentRoleTechnology OptionsScaling Strategy
Event ProducersEmit structured events/metrics from appsOpenTelemetry SDK, StatsD, custom emittersHorizontal — add producers independently
Message BusDurable, ordered event transportApache Kafka, Amazon MSK, Redpanda, PulsarPartition-based — add partitions + brokers
Schema RegistryEnforce event schema contractsConfluent Schema Registry, AWS GlueSingle cluster — scales with broker count
Stream ProcessorTransform, aggregate, enrich events in-flightApache Flink, Kafka Streams, Spark Structured StreamingTask-parallelism — add TaskManagers/executors
Pre-AggregationReduce cardinality before storageFlink windowed aggregation, ClickHouse Materialized ViewsTied to processor or storage scaling
OLAP StorageColumnar analytics store for fast queriesClickHouse, Apache Druid, Apache Pinot, TimescaleDBShard-based — add shards for writes, replicas for reads
Object StorageRaw event archive / data lake tierS3, GCS, MinIOVirtually unlimited — cost scales linearly
Batch ProcessorBackfill, reprocess, train models on historical dataApache Spark, dbt, Apache Airflow (orchestration)Cluster auto-scaling — elastic resize
Query EngineSQL interface for analysts and dashboardsClickHouse native, Trino/Presto, Grafana directRead replicas or caching layer
Serving / VisualizationDashboards, alerts, API endpointsGrafana, Superset, custom REST APIsHorizontal — stateless query proxies
AlertingThreshold and anomaly detection on metricsGrafana Alerting, Prometheus Alertmanager, PagerDutyStateless evaluators — scale horizontally
Dead-Letter QueueCapture failed/malformed events for reprocessingKafka DLQ topic, S3 error bucketSame as message bus scaling

Decision Tree

START: What is your event volume and latency requirement?
├── <10K events/sec AND minutes-latency OK?
│   ├── YES → Simple stack: producers → Kafka → batch consumer → PostgreSQL/TimescaleDB → Grafana
│   └── NO ↓
├── 10K-100K events/sec OR sub-second latency needed?
│   ├── YES → Standard streaming: Kafka → Flink (windowed aggregation) → ClickHouse → Grafana
│   │         (Kappa architecture — single stream processing path)
│   └── NO ↓
├── >100K events/sec OR complex event processing (joins, patterns)?
│   ├── YES → Full pipeline: Kafka → Flink (CEP + aggregation) → ClickHouse cluster (sharded)
│   │         Add pre-aggregation in Flink to reduce ClickHouse write amplification
│   └── NO ↓
├── >1M events/sec (hyperscale)?
│   ├── YES → Multi-tier: Kafka (tiered storage) → Flink (multi-stage DAG) → ClickHouse Cloud
│   │         Add sampling at ingestion, rollup tables, read replicas for queries
│   └── NO ↓
├── Need both real-time dashboards AND accurate historical backfill?
│   ├── YES → Lambda architecture: speed layer (Flink → ClickHouse) + batch layer (Spark → S3)
│   │         Merge results at query time. Higher operational cost but guarantees accuracy.
│   └── NO ↓
└── DEFAULT → Kappa architecture with Kafka + Flink + ClickHouse

Step-by-Step Guide

1. Define event schema and set up Schema Registry

Start with a well-defined event schema using Avro or Protobuf. Register it in a Schema Registry to enforce contracts between producers and consumers. Schema evolution (adding optional fields) is safe; removing or renaming fields requires a new schema version. [src2]

syntax = "proto3";
package analytics;

message MetricEvent {
  string event_id = 1;       // UUID for deduplication
  string metric_name = 2;    // e.g., "http_request_duration_ms"
  double value = 3;
  int64 timestamp_ms = 4;    // Unix epoch milliseconds
  map<string, string> tags = 5; // bounded-cardinality labels only
  string source = 6;         // producing service name
}

Verify: curl http://schema-registry:8081/subjects → should list metrics_event-value

2. Deploy Kafka as the central message bus

Configure Kafka topics with appropriate partition counts and replication. Use min.insync.replicas=2 with acks=all for durability. Partition by a low-cardinality key (metric_name, service_name). [src1]

# Create metrics topic — 12 partitions, replication factor 3
kafka-topics.sh --create \
  --bootstrap-server kafka:9092 \
  --topic metrics-events \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config min.insync.replicas=2

Verify: kafka-topics.sh --describe --topic metrics-events → 12 partitions, ISR=3

3. Implement stream processing with Flink

Deploy a Flink job that consumes from Kafka, applies windowed aggregation (1-minute tumbling windows), and writes results to ClickHouse. Enable checkpointing to S3/HDFS for fault tolerance. [src3]

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints/metrics-pipeline");

KafkaSource<MetricEvent> source = KafkaSource.<MetricEvent>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("metrics-events")
    .setGroupId("metrics-processor")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    .build();

events.keyBy(MetricEvent::getMetricName)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new MetricAggregator())
    .addSink(new ClickHouseSink("jdbc:clickhouse://clickhouse:8123/metrics"));

Verify: Flink Web UI at http://flink-jobmanager:8081 → job shows "RUNNING" with checkpoints succeeding

4. Configure ClickHouse storage with rollup tables

Create a raw events table using MergeTree and materialized views for pre-aggregated rollups. Use ClickHouse's built-in Kafka engine for direct ingestion if Flink is not needed, or write from Flink via JDBC. [src1] [src5]

CREATE TABLE metrics_raw (
    event_id       String,
    metric_name    LowCardinality(String),
    value          Float64,
    timestamp_ms   DateTime64(3),
    tags           Map(LowCardinality(String), String),
    source         LowCardinality(String)
) ENGINE = ReplacingMergeTree(timestamp_ms)
PARTITION BY toYYYYMM(timestamp_ms)
ORDER BY (metric_name, source, timestamp_ms, event_id)
TTL timestamp_ms + INTERVAL 90 DAY;

Verify: SELECT count() FROM metrics_raw → should increase as events flow

5. Set up serving layer with Grafana

Connect Grafana to ClickHouse using the official ClickHouse data source plugin. Create dashboards querying rollup tables (not raw) for sub-second response. [src5]

Verify: Grafana → Explore → run SELECT count() FROM metrics_1m WHERE ts > now() - INTERVAL 1 HOUR → returns data

6. Deploy monitoring for the pipeline itself

Instrument every pipeline component. Alert on consumer lag, checkpoint failures, and ClickHouse merge queue depth. [src2] [src6]

# Check Kafka consumer group lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group metrics-processor

# ClickHouse merge health
clickhouse-client -q "SELECT table, count() as merges FROM system.merges GROUP BY table"

Verify: Kafka LAG < 1000; Flink checkpoints completing; ClickHouse merges progressing

Code Examples

Python: Kafka Producer with Bounded-Cardinality Tags

# Input:  Application metrics (HTTP request durations, counters)
# Output: Structured events published to Kafka topic

from confluent_kafka import Producer  # confluent-kafka==2.6.1
import json, uuid, time

producer = Producer({
    "bootstrap.servers": "kafka:9092",
    "acks": "all",
    "linger.ms": 5,
    "compression.type": "zstd",
    "enable.idempotence": True,
})

def emit_metric(name: str, value: float, tags: dict):
    event = {
        "event_id": str(uuid.uuid4()),
        "metric_name": name,
        "value": value,
        "timestamp_ms": int(time.time() * 1000),
        "tags": tags,
        "source": "api-gateway",
    }
    producer.produce(
        topic="metrics-events",
        key=name.encode(),
        value=json.dumps(event).encode(),
        callback=lambda err, msg: err and print(f"DLQ: {err}"),
    )

SQL: ClickHouse Analytical Queries on Rollup Tables

-- Input:  Pre-aggregated 1-minute rollup table
-- Output: Dashboard-ready time series with percentiles

SELECT ts, metric_name, source,
    quantileMerge(0.99)(val_p99) AS p99_latency,
    sumMerge(val_sum) / countMerge(cnt) AS avg_latency,
    countMerge(cnt) AS request_count
FROM metrics_1m
WHERE metric_name = 'http_request_duration_ms'
  AND ts > now() - INTERVAL 24 HOUR
GROUP BY ts, metric_name, source
ORDER BY ts;

Anti-Patterns

Wrong: High-cardinality fields as metric labels

# BAD — user_id as a tag creates millions of unique time series
emit_metric("page_load_time_ms", 320.0, {
    "user_id": "u-839201",        # unbounded cardinality!
    "session_id": "s-28d7f3a",    # unbounded cardinality!
    "page": "/dashboard",
})

Correct: Bounded-cardinality tags only

# GOOD — only bounded-cardinality tags in metrics
emit_metric("page_load_time_ms", 320.0, {
    "page": "/dashboard",          # bounded: ~100 pages
    "region": "us-east-1",         # bounded: ~20 regions
    "device_type": "mobile",       # bounded: 3 options
})
# High-cardinality data goes to a separate events/logs table

Wrong: No backpressure handling

# BAD — producer fires and forgets without flow control
for event in infinite_event_stream():
    producer.produce("metrics-events", value=serialize(event))
    # No flush, no error callback — BufferError → data loss

Correct: Backpressure with bounded buffers

# GOOD — respect producer buffer limits and handle errors
for event in infinite_event_stream():
    try:
        producer.produce("metrics-events", value=serialize(event),
                         callback=delivery_report)
        producer.poll(0)
    except BufferError:
        producer.poll(1.0)  # backpressure: wait for buffer space
        producer.produce("metrics-events", value=serialize(event),
                         callback=delivery_report)

Wrong: Querying raw table for dashboards

-- BAD — scanning months of raw data for a dashboard panel
SELECT toStartOfMinute(timestamp_ms) AS ts, avg(value)
FROM metrics_raw
WHERE metric_name = 'cpu_usage'
  AND timestamp_ms > now() - INTERVAL 30 DAY  -- billions of rows
GROUP BY ts ORDER BY ts;
-- Result: 30+ second query

Correct: Query pre-aggregated rollup tables

-- GOOD — query the 1-minute materialized view rollup
SELECT ts, avgMerge(val_avg) AS avg_cpu
FROM metrics_1m
WHERE metric_name = 'cpu_usage'
  AND ts > now() - INTERVAL 30 DAY  -- only rollup rows
GROUP BY ts ORDER BY ts;
-- Result: sub-second query, 100-1000x fewer rows scanned

Common Pitfalls

Diagnostic Commands

# Monitor consumer group lag (key health metric)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group metrics-processor

# Flink job status and checkpoint health
curl -s http://flink-jobmanager:8081/jobs | jq '.jobs[] | {id, status}'

# ClickHouse insert rate and merge health
clickhouse-client -q "SELECT database, table, elapsed, progress FROM system.merges"

# ClickHouse table sizes and part counts
clickhouse-client -q "SELECT table, formatReadableSize(sum(bytes_on_disk)) AS size, count() AS parts FROM system.parts WHERE active GROUP BY table ORDER BY sum(bytes_on_disk) DESC"

Version History & Compatibility

TechnologyCurrent VersionKey ChangesMigration Notes
Apache Kafka4.0 (2025)ZooKeeper removed — KRaft-onlyMigrate ZK to KRaft before upgrading past 3.x
Apache Flink2.0 (2024)Unified DataStream + Table API1.x DataStream still works; new Table API preferred
ClickHouse24.x (2024)Improved Kafka engine, new JSON typeBackward-compatible; new features opt-in
Kafka Streams3.8 (2024)Improved state store backendsNo breaking changes from 3.x
Redpanda24.x (2024)Kafka API-compatible, no JVMDrop-in Kafka replacement
Grafana11.x (2024)ClickHouse plugin v4, unified alertingDashboard JSON forward-compatible

When to Use / When Not to Use

Use WhenDon't Use WhenUse Instead
>10K events/sec needing sub-second dashboards<1K events/sec with daily reportingManaged APM (Datadog, New Relic) or PostgreSQL
Custom aggregation logic (percentiles, funnels, cohorts)Standard infra metrics (CPU, memory, disk)Prometheus + Grafana
Multi-source data unified in one storeSingle-source application logs onlyELK stack or Grafana Loki
Need to replay/reprocess historical dataOne-time data migration or ETLApache Spark batch job or dbt
Cost-sensitive at scale (>100TB/month)Small team without data engineeringSaaS observability platform

Important Caveats

Related Units