Streaming ClickHouse data to Kafka

Introduction

ClickHouse has an inbuilt Kafka table engine which is commonly used to read streaming messages from Apache Kafka and store it in ClickHouse. This is one of the important and widely used features of ClickHouse to handle and store streaming data in ClickHouse. You can refer this article for a working example. We can also publish the data from ClickHouse to a Kafka using the Kafka table engine and materialized views in ClickHouse.

Publishing data from ClickHouse to Kafka

We will use the following docker-based setup to demonstrate pushing the data from ClickHouse to Kafka.

version: '3'

services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    networks:
      - ch_kafka
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
      - ZOOKEEPER_CLIENT_PORT=2181
    ports:
      - "2182:2181"
     
      
  kafka:
    image: 'bitnami/kafka:latest'
    networks:
      - ch_kafka
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://192.168.1.4:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    ports:
      - "9092:9092"
      - "29092:29092"
      - '9093:9093'
    depends_on:
      - zookeeper
      
  clickhouse_kafka:
    image: 'clickhouse/clickhouse-server:latest'
    ports:
      - '9123:8123'
      - '10000:9000'
      - '10009:9009'
    networks:
      - ch_kafka

networks:
  ch_kafka:
    driver: bridge

The following method will be used in this example.

Testing the ClickHouse’s Kafka engine to push data to a Kafka topic

  1. Create a table with Kafka Engine (Kafka Push Table), which will push the data to a Kafka topic in a Kafka broker
  2. Create a table (storage) based on the MergeTree engine in which the data that will be pushed to Kafka is inserted and stored
  3. A materialized view that will push the data to the Kafka Push table from the storage table
  4. Create a table with Kafka Engine (Kafka Pull Table), which will read the data to a Kafka topic
  5. A materialized view that will read the data from the Kafka Pull table and store it in ClickHouse

Create the Kafka push table

CREATE TABLE kafka_push
(ID UInt64,
Name String) 
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
         kafka_topic_list = 'kafka_push_example',
         kafka_group_name = 'consumer_group_push',
         kafka_format = 'JSONEachRow';

The kafka_push table is subscribed to a Kafka topic called kafka_push_example. The data has two fields named ID, Name.

Create Storage table

CREATE TABLE kafka_storage
(ID UInt64,
Name String) 
ENGINE = MergeTree()
ORDER BY (ID, Name);

This is the table in which we will insert the data to be pushed to Kafka. This table is based on MergeTree table engine and has the same two columns as in the table created previously.

Create Materialized view to push data to the Kafka table

CREATE MATERIALIZED VIEW kafka_push_materialized TO kafka_push AS SELECT ID, Name
FROM kafka_storage;

This materialized view reads data from the kafka_storage table and whenever the data is inserted in the table, it will be pushed to the kafka_push table which in turn will publish the data as messages in the Kafka topic.

Once the tables and views are created, the data inserted in the kafka_storage table will be available in the Kafka topic configured in the table.

Create the Kafka pull table.

CREATE TABLE kafka_pull
(ID UInt64,
Name String) 
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
         kafka_topic_list = 'kafka_push_example',
         kafka_group_name = 'consumer_group_pull',
         kafka_format = 'JSONEachRow';

This table is subscribed to the same topic as the earlier table that pushes the messages to Kafka. The consumer group is different. This table will read the messages from the Kafka topic.

Create Materialized view to pull/read data from the Kafka table

CREATE MATERIALIZED VIEW kafka_pull_materialized 
ENGINE = MergeTree()
ORDER BY (ID, Name) AS SELECT ID, Name
FROM kafka_pull;

The purpose of this table is to read the messages from the kafka_pull table and store it.

Insert the data in the storage table and verify.

01ce980a0892 :) INSERT INTO kafka_storage VALUES (1,1);
                

INSERT INTO kafka_storage FORMAT Values

Query id: 4dcd6d2e-2deb-43ed-9877-2c600cee5da6

Ok.

1 row in set. Elapsed: 1.012 sec. 
01ce980a0892 :) SELECT * FROM kafka_pull_materialized;
                

SELECT *
FROM kafka_pull_materialized

Query id: d61f3ff5-221d-409f-a3a8-573f65b7d17a

┌─ID─┬─Name─┐
│  1 │ 1    │
└────┴──────┘

1 row in set. Elapsed: 0.002 sec. 

Conclusion

We can see the data that is inserted in the MergeTree based table is available in the Materialized view. The data inserted in the kafka_storage table is read by the kafka_push_materialized table, which in turn pushes the data to the kafka_push table. The kafka_push table streams the data as JSON-formatted messages to the Kafka topic.

The kafka_pull table reads the message from the Kafka topic, and the kafka_pull_materialized view reads the data from the kafka_pull table and stores it.

To read more about Kafka in ClickHouse, please do consider reading the below articles

References

https://clickhouse.com/docs/en/integrations/kafka/kafka-table-engine/#clickhouse-to-kafka