Introduction
Apache Kafka is a distributed event streaming platform developed by Apache Software Foundation. Visit the official page before proceeding for a detailed introduction to the basics of Kafka. The installation instructions are available here.
ClickHouse has an in-built Kafka table engine, which lets the users subscribe to a Kafka topic and store the messages efficiently in ClickHouse. The messages in the Kafka topic are read by the table engine, and a materialized view starts collecting the data in the background. The materialized view can optionally transform and store the data in a final table that holds the data permanently. The messages are read in batches from the Kafka topic and when a materialized view reads the data from the Kafka , the new set of data come into the Kafka table, from the Kafka topic.
We should know the format of the raw message in Kafka beforehand. A list of supported Input and Output formats is available here. Based on the message schema, create the Kafka engine tables. As an example, let’s say that we have messages in a Kafka topic as below.
{"ID":1 , "Name": "a"} {"ID":2 , "Name": "b"} {"ID":3 , "Name": "c"}
Let us create a simple pipeline to ingest the data and store it in ClickHouse
Kafka Engine Table
As the first step, let us create a table in ClickHouse based on the Kafka table engine.
CREATE TABLE kafka_example (ID UInt64, Name String) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'kafka_topic', kafka_group_name = 'consumer_group_name', kafka_format = 'JSONEachRow';
This table, which is based on Kafka engine has the columns contained in the message. The Kafka is assumed to be running on the same host as ClickHouse. So, the broker list has the hostname mentioned as localhost. This table is reading messages from a topic called ‘kafka_topic’ from a consumer group called ‘consumer_group_name’.
There are a few more useful options for the Kafka table engine.
- kafka_row_delimiter – Delimiter character that marks the end of the message
- kafka_num_consumers – Consumers per table <= total number of partitions in the topic and <= number of cores in the ClickHouse server
- kafka_schema – Message schema which may be required for certain formats
- kafka_max_block_size – Batch size of the messages that are polled
- kafka_skip_broken_messages – Max number of bad schema messages that can be skipped in a block
- kafka_thread_per_consumer – Provide independent thread per consumer
- kafka_commit_every_batch – commit messages every batch instead of the whole block
Storage Table
Next, we will create a table based on MergeTree, which will act as a permanent storage table for the Kafka messages. This table has the same set of columns present in the Kafka engine table and will be used to store the data as it was received.
CREATE TABLE kafka_example_storage ID UInt64, Name String ) ENGINE = MergeTree() ORDER BY ID;
Materialized View
Finally, we need a materialized view, to ingest the data in to the storage table and optionally transform the data from the Kafka engine table.
CREATE MATERIALIZED VIEW kafka_example_materialized TO kafka_example_storage AS SELECT ID, Name FROM kafka_example;
In this example, We are simply storing the messages as it is in this example. Once the Materialized view, the storage table and the Kafka engine table are present, we could query the storage table and verify if the data is ingested properly and as expected.
Conclusion
So we have seen how to ingest the data in to ClickHouse, from a Kafka topic. This is a minimal example to get started with Kafka, and depending on the actual use case and the data velocity, we may have to tweak few system parameters to avoid lag and ingest the data optimally.
References
To read more about streaming in ClickHouse, do consider reading the below articles
- Ingesting Data from a Kafka Topic in ClickHouse
- Integrating Kafka with ClickHouse for Real-time Streaming
- Streaming Data from PostgreSQL to ClickHouse using Kafka and Debezium: Part 1
- Streaming Data from PostgreSQL to ClickHouse using Kafka and Debezium: Part 2
- MySQL to ClickHouse Replication with Sink Connector
- https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/