Streaming pipelines that must ingest hundreds of thousands of events per second while simultaneously serving sub-second analytical queries represent one of the hardest infrastructure problems in production data engineering. Apache Kafka is the dominant message bus for high-throughput event streams, yet the query layer downstream is frequently the bottleneck. At ChistaDATA, we architect and operate ClickHouse deployments where ClickHouse Kafka real-time analytics is central to the production workload.
This article covers the canonical three-layer ingestion architecture — Kafka topics to Kafka engine table to materialized view to ReplicatedMergeTree storage — along with schema design, ingestion tuning, pipeline observability, and the failure modes we encounter most in production. By the end, you will have a deployable blueprint for a streaming analytics pipeline that sustains real-time throughput without sacrificing query performance.

Architecture: The Canonical Kafka-to-ClickHouse Pattern
The integration relies on three cooperating layers. First, a Kafka engine table acts as a stateless consumer: it connects to Kafka brokers, subscribes to topics, and exposes messages as a readable ClickHouse table without persisting data. Second, a materialized view triggers on every batch the engine reads, applying column transforms and routing rows downstream. Third, a MergeTree family target table holds all persistent data and is the surface exposed to query clients.
This separation is deliberate. The Kafka engine manages consumer group offsets inside Kafka itself and commits them only after a successful insert completes in the target table, giving at-least-once delivery semantics by default. The materialized view handles type casts, JSON extraction, and conditional routing — no ETL process outside ClickHouse is required.
An alternative is the Kafka Connect ClickHouse Sink Connector, which suits organizations already running Kafka Connect that need exactly-once semantics or schema-registry-backed Avro deserialization. For pure throughput and simplicity, the native Kafka engine remains the preferred starting point.
Kafka Engine Setup and Materialized View Wiring
The SQL below creates the complete three-layer pipeline for an e-commerce clickstream use case on ClickHouse 24.8 LTS or 25.3. The Kafka engine table declares all consumer parameters; the materialized view performs JSON extraction; the target table is a ReplicatedMergeTree for a multi-node cluster.
-- Layer 1: Kafka engine table (consumer, no persistent storage)
CREATE TABLE default.clickstream_kafka
(
event_time DateTime,
session_id String,
user_id UInt64,
page LowCardinality(String),
event_type LowCardinality(String),
properties String -- raw JSON payload
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka-01:9092,kafka-02:9092,kafka-03:9092',
kafka_topic_list = 'clickstream.events',
kafka_group_name = 'ch_clickstream_consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4,
kafka_thread_per_consumer = 1,
kafka_max_block_size = 65536,
kafka_skip_broken_messages = 10;
-- Layer 2: Materialized view -- transform and route to persistent storage
CREATE MATERIALIZED VIEW default.clickstream_mv
TO default.clickstream_local
AS
SELECT
event_time,
session_id,
user_id,
page,
event_type,
JSONExtractString(properties, 'referrer') AS referrer,
JSONExtractUInt(properties, 'duration_ms') AS duration_ms
FROM default.clickstream_kafka;
-- Layer 3: ReplicatedMergeTree target table (persistent, queryable)
CREATE TABLE default.clickstream_local
(
event_time DateTime CODEC(DoubleDelta, ZSTD(3)),
session_id String CODEC(ZSTD(1)),
user_id UInt64 CODEC(Delta, LZ4),
page LowCardinality(String),
event_type LowCardinality(String),
referrer String CODEC(ZSTD(1)),
duration_ms UInt32 CODEC(Delta, LZ4),
_ingest_time DateTime DEFAULT now()
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/clickstream_local',
'{replica}'
)
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_type, user_id, event_time)
TTL event_time + INTERVAL 90 DAY DELETE
SETTINGS
index_granularity = 8192,
merge_with_ttl_timeout = 86400;
A representative Kafka topic configuration to deploy alongside this pipeline:
# kafka-topic-config.yaml
topic:
name: clickstream.events
partitions: 12 # >= kafka_num_consumers across all CH nodes
replication_factor: 3
config:
retention.ms: 604800000 # 7 days
cleanup.policy: delete
compression.type: lz4
min.insync.replicas: 2
message.max.bytes: 5242880 # 5 MB
segment.ms: 3600000 # 1-hour segments
ORDER BY key design. The ORDER BY tuple is ClickHouse’s primary sparse index. Place a low-cardinality discriminator first (such as event_type), a high-cardinality identifier second (user_id), and the timestamp last for optimal compression and fast point lookups.
PARTITION BY granularity. Monthly partitions (toYYYYMM(event_time)) suit streams retaining 30–90 days of data. Over-partitioning with daily partitions inflates the active part count and degrades merge performance under sustained insert load without a corresponding drop-partition retention policy.
Codec selection. DateTime columns compress well with DoubleDelta + ZSTD(3) because consecutive timestamps differ by small, predictable deltas. Integer metric columns benefit from Delta + LZ4 for fast decompression. LowCardinality(String) stores an internal dictionary and delivers 5–10x storage reduction on fields like event_type or page.
TTL clauses. Inline TTL expressions expire partitions during background merges without cron jobs. Set merge_with_ttl_timeout = 86400 to control how frequently the scheduler considers TTL-eligible parts.
Ingestion Tuning and Throughput Optimization
A well-configured Kafka consumer pipeline sustains 100,000 to 1,000,000 rows per second per consumer thread depending on row width, codec overhead, and target table complexity. At ChistaDATA, we have observed sustained rates of 800K rows/sec on 32-core nodes with four consumer threads processing 200-byte JSON events into a three-replica ReplicatedMergeTree.
The critical tuning levers are kafka_max_block_size and max_insert_block_size. The former controls how many rows the Kafka engine accumulates before passing a block to the materialized view. Setting both to 65536 balances insert frequency against per-insert overhead. Increasing to 262144 reduces parts-per-second at the cost of higher latency from message production to query visibility. The flush interval is governed by stream_flush_interval_ms (default 7,500 ms).
Async inserts (async_insert = 1) do not apply to the Kafka engine path — they are relevant when external clients POST data over HTTP or the native protocol. Inside the engine pipeline, batching is controlled entirely by kafka_max_block_size. For exactly-once semantics, use a ReplacingMergeTree target table — deduplicating on the ORDER BY key during background merges — or query with the FINAL modifier for read-time deduplication. Consult the ClickHouse Kafka engine documentation for the complete parameter reference.
Pipeline Observability and Monitoring
ClickHouse surfaces Kafka consumer state, part health, and error counts through system tables. Querying these on a schedule or exposing them to Prometheus via clickhouse_exporter provides full pipeline visibility without external tooling.
## Inspect live Kafka consumer state across all consumer threads
clickhouse-client --query "
SELECT
database, table, consumer_id,
assignment, messages_polled,
last_poll_time, last_exception
FROM system.kafka_consumers
FORMAT Vertical
"
-- Aggregate consumer health and part activity
SELECT
table,
sum(messages_polled) AS total_polled,
max(last_poll_time) AS latest_poll,
countIf(last_exception != '') AS consumers_with_errors
FROM system.kafka_consumers
WHERE database = 'default'
GROUP BY table;
-- Active part count and disk usage
SELECT
table,
count() AS active_parts,
sum(rows) AS total_rows,
sum(bytes_on_disk) AS disk_bytes
FROM system.parts
WHERE database = 'default'
AND table = 'clickstream_local'
AND active = 1
GROUP BY table;
-- Recent errors (last hour)
SELECT event_time, event, value
FROM system.errors
WHERE event_time > now() - INTERVAL 1 HOUR
ORDER BY event_time DESC LIMIT 50;
Failure Modes and Operational Runbook
The following failure modes are the ones our ClickHouse consulting services team encounters most frequently, with the remediation steps applied in production.
Consumer lag accumulation. When ClickHouse inserts slow — due to heavy query load, disk saturation, or replication bottlenecks — consumers fall behind the Kafka log end offset. The immediate mitigation is reducing max_concurrent_queries to release CPU for inserts. Long-term, route analytical queries to read-only replicas and dedicate write replicas exclusively to the ingestion path.
Poison messages. A single malformed JSON record stalls a consumer thread unless kafka_skip_broken_messages is configured. Route a dead-letter stream to a separate Kafka error topic consumed by a ClickHouse table using kafka_format = 'RawBLOB' for forensic storage and alerting on schema violations.
Schema evolution. Adding nullable columns or columns with DEFAULT expressions to the ClickHouse target table is backward-compatible. Removing a payload field while the ClickHouse schema still references it silently fills that column with defaults. Renaming a payload field without a coordinated schema migration causes data loss. Use a schema registry enforcing backward-compatible evolution — Confluent Schema Registry or AWS Glue — to prevent breaking changes from reaching ClickHouse.
Rebalance storms. A consumer group rebalance pauses all consumers while Kafka reassigns partitions. Repeated ClickHouse restarts in high-partition topics trigger cascading rebalances that stall ingestion for minutes. Set kafka_session_timeout_ms = 45000 and kafka_max_poll_interval_ms = 300000. The Apache Kafka documentation covers consumer group protocol tuning in detail.
Key Takeaways
- The three-layer pattern — Kafka engine table, materialized view, ReplicatedMergeTree — is the canonical ingestion architecture for ClickHouse Kafka real-time analytics pipelines.
- Set
kafka_num_consumersto match or be fewer than the Kafka topic partition count; exceeding the partition count provides no throughput benefit. - Place low-cardinality discriminators first in the ORDER BY key, followed by high-cardinality identifiers and then the timestamp to maximize compression and primary index efficiency.
- Use
DoubleDelta + ZSTDfor DateTime columns,Delta + LZ4for integer metrics, andLowCardinality(String)for low-cardinality string fields for 5–10x storage reduction. - At-least-once delivery is the default; achieve idempotent ingestion using a
ReplacingMergeTreetarget table or theFINALmodifier at query time. - Monitor
system.kafka_consumers,system.parts, andsystem.errorscontinuously — a non-nulllast_exceptionor rising part count are the earliest pipeline degradation signals.
How ChistaDATA Can Help
Building a Kafka-to-ClickHouse pipeline that performs reliably at scale requires deep knowledge of Kafka consumer tuning, ClickHouse merge mechanics, replication topology, and codec selection. At ChistaDATA, we specialize in ClickHouse architecture, deployment, and operations for organizations running real-time analytics at scale. Our engineering team has designed and hardened streaming pipelines processing billions of events per day across fintech, e-commerce, adtech, and IoT verticals.
We offer ClickHouse consulting services covering greenfield pipeline architecture, performance tuning, schema redesign for high-cardinality streaming data, and 24×7 production support. Schedule a consultation to discuss how we can help you build a reliable, high-throughput streaming analytics platform on ClickHouse.
Frequently Asked Questions
What delivery semantics does the ClickHouse Kafka engine provide?
The ClickHouse Kafka engine provides at-least-once delivery by default. Offsets are committed only after a successful insert completes in the target table. If a node restarts between the insert and the offset commit, messages will be replayed and may produce duplicate rows. Use a ReplacingMergeTree target table or the FINAL query modifier to handle deduplication downstream.
How many Kafka partitions should a ClickHouse Kafka consumer topic have?
Provision at least as many partitions as the total kafka_num_consumers summed across all ClickHouse nodes consuming the topic. With four consumers per node on three nodes, the topic needs at least twelve partitions. Under-partitioned topics cap throughput at the per-partition rate regardless of how many consumer threads are configured in ClickHouse.
Is the Kafka Connect ClickHouse Sink Connector better than the native Kafka engine?
Each option suits different contexts. The native engine requires no additional infrastructure and typically delivers higher raw throughput. The Kafka Connect ClickHouse Sink Connector is preferable when exactly-once semantics backed by Kafka transactions or Confluent Schema Registry integration is required. The right choice depends on existing infrastructure and the delivery guarantees the pipeline must satisfy.
What is a realistic ingestion throughput for a ClickHouse Kafka pipeline?
A well-tuned pipeline sustains 100,000 to 1,000,000 rows per second per consumer thread. At ChistaDATA, we have measured 800,000 rows/sec on 32-core nodes ingesting 200-byte JSON events into a three-replica ReplicatedMergeTree with four active consumer threads. Actual throughput varies with row width, codec selection, replication factor, and concurrent query load.
How should schema evolution be handled for Kafka-sourced data in ClickHouse?
Adding nullable columns or columns with DEFAULT expressions is backward-compatible and safe. Removing a payload field while the ClickHouse schema still references it silently fills that column with defaults. Renaming a field without a coordinated migration causes data loss. Use a schema registry enforcing backward-compatible evolution to prevent breaking changes from reaching ClickHouse undetected.
How do you monitor consumer lag in a ClickHouse Kafka pipeline?
Query system.kafka_consumers and inspect messages_polled, last_poll_time, and last_exception per consumer thread. For exact offset-delta lag, run kafka-consumer-groups.sh --describe against the group configured in kafka_group_name. Alert when lag grows monotonically over five minutes — that confirms inserts are slower than the production rate.