ClickHouse and Apache Kafka: Architecting Real-Time Streaming Analytics at Scale

Streaming pipelines that must ingest hundreds of thousands of events per second while simultaneously serving sub-second analytical queries represent one of the hardest infrastructure problems in production data engineering. Apache Kafka is the dominant message bus for high-throughput event streams, yet the query layer downstream is frequently the bottleneck. At ChistaDATA, we architect and operate ClickHouse deployments where ClickHouse Kafka real-time analytics is central to the production workload.

This article covers the canonical three-layer ingestion architecture — Kafka topics to Kafka engine table to materialized view to ReplicatedMergeTree storage — along with schema design, ingestion tuning, pipeline observability, and the failure modes we encounter most in production. By the end, you will have a deployable blueprint for a streaming analytics pipeline that sustains real-time throughput without sacrificing query performance.

ClickHouse Kafka Real-Time Analytics

Architecture: The Canonical Kafka-to-ClickHouse Pattern

The integration relies on three cooperating layers. First, a Kafka engine table acts as a stateless consumer: it connects to Kafka brokers, subscribes to topics, and exposes messages as a readable ClickHouse table without persisting data. Second, a materialized view triggers on every batch the engine reads, applying column transforms and routing rows downstream. Third, a MergeTree family target table holds all persistent data and is the surface exposed to query clients.

This separation is deliberate. The Kafka engine manages consumer group offsets inside Kafka itself and commits them only after a successful insert completes in the target table, giving at-least-once delivery semantics by default. The materialized view handles type casts, JSON extraction, and conditional routing — no ETL process outside ClickHouse is required.

An alternative is the Kafka Connect ClickHouse Sink Connector, which suits organizations already running Kafka Connect that need exactly-once semantics or schema-registry-backed Avro deserialization. For pure throughput and simplicity, the native Kafka engine remains the preferred starting point.

Kafka Engine Setup and Materialized View Wiring

The SQL below creates the complete three-layer pipeline for an e-commerce clickstream use case on ClickHouse 24.8 LTS or 25.3. The Kafka engine table declares all consumer parameters; the materialized view performs JSON extraction; the target table is a ReplicatedMergeTree for a multi-node cluster.

-- Layer 1: Kafka engine table (consumer, no persistent storage)
CREATE TABLE default.clickstream_kafka
(
    event_time   DateTime,
    session_id   String,
    user_id      UInt64,
    page         LowCardinality(String),
    event_type   LowCardinality(String),
    properties   String   -- raw JSON payload
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list          = 'kafka-01:9092,kafka-02:9092,kafka-03:9092',
    kafka_topic_list           = 'clickstream.events',
    kafka_group_name           = 'ch_clickstream_consumer',
    kafka_format               = 'JSONEachRow',
    kafka_num_consumers        = 4,
    kafka_thread_per_consumer  = 1,
    kafka_max_block_size       = 65536,
    kafka_skip_broken_messages = 10;

-- Layer 2: Materialized view -- transform and route to persistent storage
CREATE MATERIALIZED VIEW default.clickstream_mv
TO default.clickstream_local
AS
SELECT
    event_time,
    session_id,
    user_id,
    page,
    event_type,
    JSONExtractString(properties, 'referrer')    AS referrer,
    JSONExtractUInt(properties,   'duration_ms') AS duration_ms
FROM default.clickstream_kafka;

-- Layer 3: ReplicatedMergeTree target table (persistent, queryable)
CREATE TABLE default.clickstream_local
(
    event_time   DateTime            CODEC(DoubleDelta, ZSTD(3)),
    session_id   String              CODEC(ZSTD(1)),
    user_id      UInt64              CODEC(Delta, LZ4),
    page         LowCardinality(String),
    event_type   LowCardinality(String),
    referrer     String              CODEC(ZSTD(1)),
    duration_ms  UInt32              CODEC(Delta, LZ4),
    _ingest_time DateTime DEFAULT now()
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/clickstream_local',
    '{replica}'
)
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_type, user_id, event_time)
TTL event_time + INTERVAL 90 DAY DELETE
SETTINGS
    index_granularity      = 8192,
    merge_with_ttl_timeout = 86400;

A representative Kafka topic configuration to deploy alongside this pipeline:

# kafka-topic-config.yaml
topic:
  name: clickstream.events
  partitions: 12           # >= kafka_num_consumers across all CH nodes
  replication_factor: 3
  config:
    retention.ms: 604800000        # 7 days
    cleanup.policy: delete
    compression.type: lz4
    min.insync.replicas: 2
    message.max.bytes: 5242880     # 5 MB
    segment.ms: 3600000            # 1-hour segments

ORDER BY key design. The ORDER BY tuple is ClickHouse’s primary sparse index. Place a low-cardinality discriminator first (such as event_type), a high-cardinality identifier second (user_id), and the timestamp last for optimal compression and fast point lookups.

PARTITION BY granularity. Monthly partitions (toYYYYMM(event_time)) suit streams retaining 30–90 days of data. Over-partitioning with daily partitions inflates the active part count and degrades merge performance under sustained insert load without a corresponding drop-partition retention policy.

Codec selection. DateTime columns compress well with DoubleDelta + ZSTD(3) because consecutive timestamps differ by small, predictable deltas. Integer metric columns benefit from Delta + LZ4 for fast decompression. LowCardinality(String) stores an internal dictionary and delivers 5–10x storage reduction on fields like event_type or page.

TTL clauses. Inline TTL expressions expire partitions during background merges without cron jobs. Set merge_with_ttl_timeout = 86400 to control how frequently the scheduler considers TTL-eligible parts.

Ingestion Tuning and Throughput Optimization

A well-configured Kafka consumer pipeline sustains 100,000 to 1,000,000 rows per second per consumer thread depending on row width, codec overhead, and target table complexity. At ChistaDATA, we have observed sustained rates of 800K rows/sec on 32-core nodes with four consumer threads processing 200-byte JSON events into a three-replica ReplicatedMergeTree.

The critical tuning levers are kafka_max_block_size and max_insert_block_size. The former controls how many rows the Kafka engine accumulates before passing a block to the materialized view. Setting both to 65536 balances insert frequency against per-insert overhead. Increasing to 262144 reduces parts-per-second at the cost of higher latency from message production to query visibility. The flush interval is governed by stream_flush_interval_ms (default 7,500 ms).

Async inserts (async_insert = 1) do not apply to the Kafka engine path — they are relevant when external clients POST data over HTTP or the native protocol. Inside the engine pipeline, batching is controlled entirely by kafka_max_block_size. For exactly-once semantics, use a ReplacingMergeTree target table — deduplicating on the ORDER BY key during background merges — or query with the FINAL modifier for read-time deduplication. Consult the ClickHouse Kafka engine documentation for the complete parameter reference.

Pipeline Observability and Monitoring

ClickHouse surfaces Kafka consumer state, part health, and error counts through system tables. Querying these on a schedule or exposing them to Prometheus via clickhouse_exporter provides full pipeline visibility without external tooling.

## Inspect live Kafka consumer state across all consumer threads
clickhouse-client --query "
SELECT
    database, table, consumer_id,
    assignment, messages_polled,
    last_poll_time, last_exception
FROM system.kafka_consumers
FORMAT Vertical
"
-- Aggregate consumer health and part activity
SELECT
    table,
    sum(messages_polled)          AS total_polled,
    max(last_poll_time)           AS latest_poll,
    countIf(last_exception != '') AS consumers_with_errors
FROM system.kafka_consumers
WHERE database = 'default'
GROUP BY table;

-- Active part count and disk usage
SELECT
    table,
    count()            AS active_parts,
    sum(rows)          AS total_rows,
    sum(bytes_on_disk) AS disk_bytes
FROM system.parts
WHERE database = 'default'
  AND table  = 'clickstream_local'
  AND active = 1
GROUP BY table;

-- Recent errors (last hour)
SELECT event_time, event, value
FROM system.errors
WHERE event_time > now() - INTERVAL 1 HOUR
ORDER BY event_time DESC LIMIT 50;

Failure Modes and Operational Runbook

The following failure modes are the ones our ClickHouse consulting services team encounters most frequently, with the remediation steps applied in production.

Consumer lag accumulation. When ClickHouse inserts slow — due to heavy query load, disk saturation, or replication bottlenecks — consumers fall behind the Kafka log end offset. The immediate mitigation is reducing max_concurrent_queries to release CPU for inserts. Long-term, route analytical queries to read-only replicas and dedicate write replicas exclusively to the ingestion path.

Poison messages. A single malformed JSON record stalls a consumer thread unless kafka_skip_broken_messages is configured. Route a dead-letter stream to a separate Kafka error topic consumed by a ClickHouse table using kafka_format = 'RawBLOB' for forensic storage and alerting on schema violations.

Schema evolution. Adding nullable columns or columns with DEFAULT expressions to the ClickHouse target table is backward-compatible. Removing a payload field while the ClickHouse schema still references it silently fills that column with defaults. Renaming a payload field without a coordinated schema migration causes data loss. Use a schema registry enforcing backward-compatible evolution — Confluent Schema Registry or AWS Glue — to prevent breaking changes from reaching ClickHouse.

Rebalance storms. A consumer group rebalance pauses all consumers while Kafka reassigns partitions. Repeated ClickHouse restarts in high-partition topics trigger cascading rebalances that stall ingestion for minutes. Set kafka_session_timeout_ms = 45000 and kafka_max_poll_interval_ms = 300000. The Apache Kafka documentation covers consumer group protocol tuning in detail.

Key Takeaways

  • The three-layer pattern — Kafka engine table, materialized view, ReplicatedMergeTree — is the canonical ingestion architecture for ClickHouse Kafka real-time analytics pipelines.
  • Set kafka_num_consumers to match or be fewer than the Kafka topic partition count; exceeding the partition count provides no throughput benefit.
  • Place low-cardinality discriminators first in the ORDER BY key, followed by high-cardinality identifiers and then the timestamp to maximize compression and primary index efficiency.
  • Use DoubleDelta + ZSTD for DateTime columns, Delta + LZ4 for integer metrics, and LowCardinality(String) for low-cardinality string fields for 5–10x storage reduction.
  • At-least-once delivery is the default; achieve idempotent ingestion using a ReplacingMergeTree target table or the FINAL modifier at query time.
  • Monitor system.kafka_consumers, system.parts, and system.errors continuously — a non-null last_exception or rising part count are the earliest pipeline degradation signals.

How ChistaDATA Can Help

Building a Kafka-to-ClickHouse pipeline that performs reliably at scale requires deep knowledge of Kafka consumer tuning, ClickHouse merge mechanics, replication topology, and codec selection. At ChistaDATA, we specialize in ClickHouse architecture, deployment, and operations for organizations running real-time analytics at scale. Our engineering team has designed and hardened streaming pipelines processing billions of events per day across fintech, e-commerce, adtech, and IoT verticals.

We offer ClickHouse consulting services covering greenfield pipeline architecture, performance tuning, schema redesign for high-cardinality streaming data, and 24×7 production support. Schedule a consultation to discuss how we can help you build a reliable, high-throughput streaming analytics platform on ClickHouse.

Frequently Asked Questions

What delivery semantics does the ClickHouse Kafka engine provide?

The ClickHouse Kafka engine provides at-least-once delivery by default. Offsets are committed only after a successful insert completes in the target table. If a node restarts between the insert and the offset commit, messages will be replayed and may produce duplicate rows. Use a ReplacingMergeTree target table or the FINAL query modifier to handle deduplication downstream.

How many Kafka partitions should a ClickHouse Kafka consumer topic have?

Provision at least as many partitions as the total kafka_num_consumers summed across all ClickHouse nodes consuming the topic. With four consumers per node on three nodes, the topic needs at least twelve partitions. Under-partitioned topics cap throughput at the per-partition rate regardless of how many consumer threads are configured in ClickHouse.

Is the Kafka Connect ClickHouse Sink Connector better than the native Kafka engine?

Each option suits different contexts. The native engine requires no additional infrastructure and typically delivers higher raw throughput. The Kafka Connect ClickHouse Sink Connector is preferable when exactly-once semantics backed by Kafka transactions or Confluent Schema Registry integration is required. The right choice depends on existing infrastructure and the delivery guarantees the pipeline must satisfy.

What is a realistic ingestion throughput for a ClickHouse Kafka pipeline?

A well-tuned pipeline sustains 100,000 to 1,000,000 rows per second per consumer thread. At ChistaDATA, we have measured 800,000 rows/sec on 32-core nodes ingesting 200-byte JSON events into a three-replica ReplicatedMergeTree with four active consumer threads. Actual throughput varies with row width, codec selection, replication factor, and concurrent query load.

How should schema evolution be handled for Kafka-sourced data in ClickHouse?

Adding nullable columns or columns with DEFAULT expressions is backward-compatible and safe. Removing a payload field while the ClickHouse schema still references it silently fills that column with defaults. Renaming a field without a coordinated migration causes data loss. Use a schema registry enforcing backward-compatible evolution to prevent breaking changes from reaching ClickHouse undetected.

How do you monitor consumer lag in a ClickHouse Kafka pipeline?

Query system.kafka_consumers and inspect messages_polled, last_poll_time, and last_exception per consumer thread. For exact offset-delta lag, run kafka-consumer-groups.sh --describe against the group configured in kafka_group_name. Alert when lag grows monotonically over five minutes — that confirms inserts are slower than the production rate.

You might also like:

About ChistaDATA Inc. 206 Articles
We are an full-stack ClickHouse infrastructure operations Consulting, Support and Managed Services provider with core expertise in performance, scalability and data SRE. Based out of California, Our consulting and support engineering team operates out of San Francisco, Vancouver, London, Germany, Russia, Ukraine, Australia, Singapore and India to deliver 24*7 enterprise-class consultative support and managed services. We operate very closely with some of the largest and planet-scale internet properties like PayPal, Garmin, Honda cars IoT project, Viacom, National Geographic, Nike, Morgan Stanley, American Express Travel, VISA, Netflix, PRADA, Blue Dart, Carlsberg, Sony, Unilever etc