How to Ingest Data from a Kafka Topic in ClickHouse

Introduction

Apache Kafka is a distributed event streaming platform developed by Apache Software Foundation. Visit the official page before proceeding for a detailed introduction to the basics of Kafka. The installation instructions are available here.

ClickHouse has an in-built Kafka table engine, which lets the users to subscribe to a Kafka topic and store the messages efficiently in ClickHouse. The messages in the Kafka topic are read by the table engine, and a materialized view starts collecting the data in the background. The materialized view converts and stores the data in a final table that holds the data permanently. Multiple materialized views can be read from a single Kafka engine. We can have different data transformations on the messages based on multiple materialized views.

We should know the format of the raw message in Kafka beforehand. A list of supported Input and Output formats is available here.  Based on the message schema, create the Kafka engine tables. As an example, let’s say that we have messages in a Kafka topic as below.

{"ID":1 , "Name": "a"}
{"ID":2 , "Name": "b"}
{"ID":3 , "Name": "c"}

We will have to create materialized view with two columns called ID and Name.

Kafka Engine Table

CREATE TABLE kafka_example
(ID UInt64,
Name String) 
ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
         kafka_topic_list = 'kafka_topic',
         kafka_group_name = 'consumer_group_name',
         kafka_format = 'JSONEachRow';

Kafka engine table has the columns contained in the message. The Kafka is running on the same host as ClickHouse. This table is reading messages from a topic called ‘kafka_topic’ from a consumer group called ‘consumer_group_name’.

Storage Table

CREATE TABLE kafka_example_storage
ID UInt64,
Name String
)
ENGINE = MergeTree()
ORDER BY ID;

Materialized View

CREATE MATERIALIZED VIEW kafka_example_materialized TO kafka_example_storage
AS SELECT ID, Name
FROM kafka_example;

We are simply storing the messages as it is in this example. There are a few more useful options for the Kafka table engine.

  • kafka_row_delimiter – Delimiter character that marks the end of the message
  • kafka_num_consumers – Consumers per table <= total number of partitions in the topic and <= number of cores in the ClickHouse server
  • kafka_schema – Message schema which may be required for certain formats
  • kafka_max_block_size – Batch size of the messages that are polled
  • kafka_skip_broken_messages – Max number of bad schema messages that can be skipped in a block
  • kafka_thread_per_consumer – Provide independent thread per consumer
  • kafka_commit_every_batch – commit messages every batch instead of the whole block

Conclusion

To read more about streaming in ClickHouse, do consider reading the below articles

References

https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/