Streaming Data from MySQL to ClickHouse using Redpanda and KsqlDB

Introduction

In a traditional way, data analysis is not quick and is not frequently performed. In the modern world, it is necessary to perform real time data analysis quickly and frequently to improve the business. Real time streaming data architecture provides quick data analysis. This blog post will explain how to perform the data streaming from MySQL to ClickHouse using the following components.

  • MySQL
  • Debezium connector (CDC)
  • Redpanda
  • KsqlDB
  • ClickHouse

MySQL to ClickHouse Data Streaming Implementation

Docker Compose

To implement the setup, we have the pre-compiled docker compose file. The files can be downloaded from our git repository.

sakthivel@Sris-MacBook-Pro ~ %  git clone https://github.com/ChistaDATA/mysql-clickhouse-redpanda-ksqldb-stream.git
Cloning into 'mysql-clickhouse-redpanda-ksqldb-stream'...
remote: Enumerating objects: 35, done.
remote: Counting objects: 100% (35/35), done.
remote: Compressing objects: 100% (34/34), done.
remote: Total 35 (delta 0), reused 0 (delta 0), pack-reused 0
Receiving objects: 100% (35/35), 22.13 MiB | 821.00 KiB/s, done.

sakthivel@Sris-MacBook-Pro ~ %  ls
mysql-clickhouse-redpanda-ksqldb-stream

sakthivel@Sris-MacBook-Pro ~ %  cd mysql-clickhouse-redpanda-ksqldb-stream/
sakthivel@Sris-MacBook-Pro ~ %  ls -lrth
total 8.0K
drwxrwxr-x 2 sakthivel sakthivel 4.0K Dec 26 14:47 debezium-connector-clickhouse
-rw-rw-r-- 1 sakthivel sakthivel 2.5K Dec 26 14:47 docker-compose.yaml

Once the files are cloned, we need to bring the components up using the docker-compose utility.

sakthivel@Sris-MacBook-Pro ~ % docker-compose -f docker-compose.yaml up
[+] Running 7/7
 ⠿ Network sakthivel_default         Created                                                                            0.0s
 ⠿ Container chista-redpanda         Created                                                                            0.1s
 ⠿ Container sakthivel-clickhouse-1  Created                                                                            0.1s
 ⠿ Container chista-mysql            Created                                                                            0.1s
 ⠿ Container chista-connector        Created                                                                            0.1s
 ⠿ Container chista-registry         Created                                                                            0.1s
 ⠿ Container chista-ksqldb           Created                                                                            0.1s
Attaching to chista-connector, chista-ksqldb, chista-mysql, chista-redpanda, chista-registry, sakthivel-clickhouse-1

As we see above, the components are getting created. Let see the containers status using following command.

sakthivel@Sris-MacBook-Pro ~ % docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Names}}\t{{.Ports}}"
NAMES                    STATUS       NAMES                    PORTS
chista-connector         Up 4 hours   chista-connector         0.0.0.0:8083->8083/tcp, 9092/tcp
chista-registry          Up 4 hours   chista-registry          0.0.0.0:8081->8081/tcp
chista-ksqldb            Up 4 hours   chista-ksqldb            0.0.0.0:8088->8088/tcp
sakthivel-clickhouse-1   Up 4 hours   sakthivel-clickhouse-1   9009/tcp, 0.0.0.0:9123->8123/tcp, 0.0.0.0:8002->9000/tcp
chista-mysql             Up 4 hours   chista-mysql             0.0.0.0:3306->3306/tcp, 33060/tcp
chista-redpanda          Up 4 hours   chista-redpanda          8081-8082/tcp, 0.0.0.0:9092->9092/tcp, 9644/tcp, 0.0.0.0:29092->29092/tcp

Now, we have all the required components running. The next step is configuring the components to perform the data streaming. We can see the configuration section with three parts.

  • Configuration – Debezium connector
  • Configuration – ksqlDB
  • Configuration – ClickHouse

Configuring Debezium connector

The following data are automatically created when building the MySQL container. Our aim is to configure the data streaming for “inventory.orders” table.

sakthivel@Sris-MacBook-Pro ~ % docker exec -it chista-mysql /bin/bash                    
root@dffe0397925d:/# 
root@dffe0397925d:/# mysql -uroot -p
Enter password: 

mysql> use inventory
Database changed

mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
6 rows in set (0.00 sec)

mysql> select * from orders;
+--------------+------------+-----------+----------+------------+
| order_number | order_date | purchaser | quantity | product_id |
+--------------+------------+-----------+----------+------------+
|        10001 | 2016-01-16 |      1001 |        1 |        102 |
|        10002 | 2016-01-17 |      1002 |        2 |        105 |
|        10003 | 2016-02-19 |      1002 |        2 |        106 |
|        10004 | 2016-02-21 |      1003 |        1 |        107 |
+--------------+------------+-----------+----------+------------+
4 rows in set (0.01 sec)

The first step, we need to login the debezium container and copy all the connector related files to Kafka connect directory.

sakthivel@Sris-MacBook-Pro ~ % docker exec -it chista-connector /bin/bash
[kafka@7fa37157caf8 ~]$ cd /kafka/manual/
[kafka@7fa37157caf8 manual]$ 
[kafka@7fa37157caf8 manual]$ cp -r * /kafka/connect/
[kafka@7fa37157caf8 manual]$ cd /kafka/connect/
[kafka@7fa37157caf8 connect]$ ls -lrth
total 24M
drwxr-xr-x 2 kafka kafka  4.0K Sep  6 02:19 debezium-connector-postgres
drwxr-xr-x 2 kafka kafka  4.0K Sep  6 02:19 debezium-connector-mysql
drwxr-xr-x 2 kafka kafka  4.0K Sep  6 02:19 debezium-connector-mongodb
drwxr-xr-x 2 kafka kafka  4.0K Sep  6 02:19 debezium-connector-sqlserver
drwxr-xr-x 2 kafka kafka  4.0K Sep  6 02:19 debezium-connector-vitess
drwxr-xr-x 2 kafka kafka  4.0K Sep  6 02:19 debezium-connector-oracle
drwxr-xr-x 2 kafka kafka  4.0K Sep  6 02:19 debezium-connector-db2
-rw-r--r-- 1 kafka kafka  210K Dec 26 04:56 checker-qual-3.5.0.jar
-rw-r--r-- 1 kafka kafka  406K Dec 26 04:56 clickhouse-client-0.3.2.jar
-rw-r--r-- 1 kafka kafka   37K Dec 26 04:56 clickhouse-http-client-0.3.2.jar
-rw-r--r-- 1 kafka kafka  467K Dec 26 04:56 clickhouse-jdbc-0.3.2.jar
-rw-r--r-- 1 kafka kafka   17K Dec 26 04:56 common-utils-6.0.0.jar

....

Next, we need to configure the debezium connector to capture the changes from MySQL.

sakthivel@Sris-MacBook-Pro ~ % curl --request POST --url http://localhost:8083/connectors --header 'Content-Type: application/json' --data '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "chista-mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "223344", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "redpanda:9092", "database.history.kafka.topic": "schema-changes.inventory" }}'

{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"chista-mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"223344","database.server.name":"dbserver1","database.include.list":"inventory","database.history.kafka.bootstrap.servers":"redpanda:9092","database.history.kafka.topic":"schema-changes.inventory","name":"inventory-connector"},"tasks":[],"type":"source"}% sakthivel@Sris-MacBook-Pro ~ %

Once the connector is configured, we can see the topics are getting created on red panda as well as ksqlDB.

redpanda:

sakthivel@Sris-MacBook-Pro ~ % docker exec -it chista-redpanda /bin/bash
redpanda@redpanda:/$ 
redpanda@redpanda:/$ rpk topic list
NAME                                                                                                      PARTITIONS  REPLICAS
dbserver1.inventory.addresses                                                                             1           1
dbserver1.inventory.customers                                                                             1           1
dbserver1.inventory.geom                                                                                  1           1
dbserver1.inventory.orders                                                                                1           1
dbserver1.inventory.products                                                                              1           1

ksqlDB:

sakthivel@Sris-MacBook-Pro ~ % docker exec -it chista-ksqldb /bin/bash
[appuser@ksqldb-server ~]$ ksql
ksql> show topics;

 Kafka Topic                          | Partitions | Partition Replicas 
------------------------------------------------------------------------
 dbserver1                            | 1          | 1                  
 dbserver1.inventory.addresses        | 1          | 1                  
 dbserver1.inventory.customers        | 1          | 1                  
 dbserver1.inventory.geom             | 1          | 1                  
 dbserver1.inventory.orders           | 1          | 1                  
 dbserver1.inventory.products         | 1          | 1                  
 dbserver1.inventory.products_on_hand | 1          | 1                  
 default_ksql_processing_log          | 1          | 1                  
 inventory.configs                    | 1          | 1                  
 inventory.offset                     | 25         | 1                  
 inventory.status                     | 5          | 1                  
 schema-changes.inventory             | 1          | 1                  
------------------------------------------------------------------------

Now we can process the topic “dbserver1.inventory.orders” on ksqlDB to get the required data.

Configuring ksqlDB

At ksqlDB, We can use the PRINT command to view the orders topic. But, it is quite verbose and difficult to read, as shown below.

ksql> print 'dbserver1.inventory.orders' from beginning;
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/12/26 05:05:34.074 Z, key: {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"order_number"}],"optional":false,"name":"dbserver1.inventory.orders.Key"},"payload":{"order_number":10001}}, value: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"order_number"},{"type":"int32","optional":false,"name":"io.debezium.time.Date","version":1,"field":"order_date"},{"type":"int32","optional":false,"field":"purchaser"},{"type":"int32","optional":false,"field":"quantity"},{"type":"int32","optional":false,"field":"product_id"}],"optional":true,"name":"dbserver1.inventory.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"order_number"},{"type":"int32","optional":false,"name":"io.debezium.time.Date","version":1,"field":"order_date"},{"type":"int32","optional":false,"field":"purchaser"},{"type":"int32","optional":false,"field":"quantity"},{"type":"int32","optional":false,"field":"product_id"}],"optional":true,"name":"dbserver1.inventory.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.orders.Envelope"},"payload":{"before":null,"after":{"order_number":10001,"order_date":16816,"purchaser":1001,"quantity":1,"product_id":102},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1672031133198,"snapshot":"true","db":"inventory","sequence":null,"table":"orders","server_id":0,"gtid":null,"file":"mysql-bin.000004","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1672031133203,"transaction":null}}, partition: 0

So, we need to extract the information from this entire topic. Firstly, we will create a STREAM in ksqlDB to extract the payload values.

ksql> CREATE STREAM Payload_Store_Before_After_Orders ( payload STRUCT<
>                   "before" STRUCT<
>                   "order_number" INT,
>                   "order_date" DATE,
>                   "purchaser" INT,
>                   "quantity" INT,
>                   "product_id" INT>,
>                   after STRUCT<
>                   "order_number" INT,
>                   "order_date" DATE,
>                   "purchaser" INT,
>                   "quantity" INT,
>                   "product_id" INT
>                   >>)
>     WITH (KAFKA_TOPIC='dbserver1.inventory.orders',
>     VALUE_FORMAT='JSON',PARTITIONS=1, REPLICAS=1);

 Message        
----------------
 Stream created 
----------------

Let’s query the payload stream.

ksql> select * from PAYLOAD_STORE_BEFORE_AFTER_ORDERS;
+---------------------------------------------------------------------------------------------------------------------------+
|PAYLOAD                                                                                                                    |
+---------------------------------------------------------------------------------------------------------------------------+
|{before=null, AFTER={order_number=10001, order_date=16816, purchaser=1001, quantity=1, product_id=102}}                    |
|{before=null, AFTER={order_number=10002, order_date=16817, purchaser=1002, quantity=2, product_id=105}}                    |
|{before=null, AFTER={order_number=10003, order_date=16850, purchaser=1002, quantity=2, product_id=106}}                    |
|{before=null, AFTER={order_number=10004, order_date=16852, purchaser=1003, quantity=1, product_id=107}}                    |
Query Completed

As you see in the above output, it has the after and before values from the payload. Now, we are going to extract the only after value. For that, we need to create another stream.

ksql> create STREAM After_Orders as select payload->"AFTER" from Payload_Store_Before_After_Orders emit changes;

 Message                                   
-------------------------------------------
 Created query with ID CSAS_AFTER_ORDERS_6 

Let’s query the stream.

ksql> select * from After_Orders;
+---------------------------------------------------------------------------------------------------------------------------+
|AFTER                                                                                                                      |
+---------------------------------------------------------------------------------------------------------------------------+
|{order_number=10001, order_date=16816, purchaser=1001, quantity=1, product_id=102}                                         |
|{order_number=10002, order_date=16817, purchaser=1002, quantity=2, product_id=105}                                         |
|{order_number=10003, order_date=16850, purchaser=1002, quantity=2, product_id=106}                                         |
|{order_number=10004, order_date=16852, purchaser=1003, quantity=1, product_id=107}                                         |
Query Completed

Now we have the after value, but still, we need individual columns. To separate this into individual columns, we can create one more stream, as shown below.

ksql> create STREAM Columns_With_Changes as select AFTER->"order_number" as order_number, AFTER->"order_date" as order_date, AFTER->"purchaser" as purchaser, AFTER->"quantity" as quantity, AFTER->"product_id" as product_id from After_Orders emit changes;

 Message                                            
----------------------------------------------------
 Created query with ID CSAS_COLUMNS_WITH_CHANGES_26 
----------------------------------------------------
ksql> 
ksql> select * from Columns_With_Changes;
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|ORDER_NUMBER           |ORDER_DATE             |PURCHASER              |QUANTITY               |PRODUCT_ID             |
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|10001                  |2016-01-16             |1001                   |1                      |102                    |
|10002                  |2016-01-17             |1002                   |2                      |105                    |
|10003                  |2016-02-19             |1002                   |2                      |106                    |
|10004                  |2016-02-21             |1003                   |1                      |107                    |
Query Completed

Now we can see the MySQL data ( table: orders ) in ksqlDB with formatted.

Configuring ClickHouse

As a final part, we have to sync the data from ksqlDB to ClickHouse. This can be achieved in different ways.

  • Using Sink connector
  • Using the ClickHouse Kafka engine

By using the Sink connector, we have to create the connector on ksqlDB and need to create the respective table in ClickHouse.

By using ClickHouse Kafka engine, we need to create the table on ClickHouse with Kafka engine as shown below.

727b5523b75a :) CREATE TABLE default.data_streamFix
                (
                    `value` String
                )
                ENGINE = Kafka
                SETTINGS kafka_broker_list = 'redpanda:9092', kafka_topic_list = 'COLUMNS_WITH_CHANGES', kafka_group_name = 'groupFix', kafka_format = 'JSONAsString', kafka_row_delimiter = '\n', input_format_import_nested_json = 1


Ok.

0 rows in set. Elapsed: 0.026 sec.

Once the table is created, the data will be streamed from ksqlDB stream to Kafka table. For this, we need to enable the following value.

727b5523b75a :) set stream_like_engine_allow_direct_select=1;

SET stream_like_engine_allow_direct_select = 1

Query id: 2be27ac3-d0f5-4e29-bc48-ca04f16dd6fc

Ok.

0 rows in set. Elapsed: 0.001 sec. 

Here, I used the JSONAsString format to copy the entire value into table. Let’s query the Kafka engine table.

727b5523b75a :) select * from default.data_streamFix;

SELECT *
FROM default.data_streamFix

Query id: 56ecef4e-a4d9-466c-a2e8-b6e7e9ddcaba

┌─value───────────────────────────────────────────────────────────────┐
│ {"ORDER_DATE":16816,"PURCHASER":1001,"QUANTITY":1,"PRODUCT_ID":102} │
│ {"ORDER_DATE":16817,"PURCHASER":1002,"QUANTITY":2,"PRODUCT_ID":105} │
│ {"ORDER_DATE":16850,"PURCHASER":1002,"QUANTITY":2,"PRODUCT_ID":106} │
│ {"ORDER_DATE":16852,"PURCHASER":1003,"QUANTITY":1,"PRODUCT_ID":107} │
└─────────────────────────────────────────────────────────────────────┘

4 rows in set. Elapsed: 3.512 sec.

We can use the JSONExtract functions to separate the values into column.

For example,

727b5523b75a :) SELECT
                    JSONExtract(value, 'ORDER_DATE', 'String') AS ORDER_DATE,
                    JSONExtract(value, 'PURCHASER', 'String') AS PURCHASER,
                    JSONExtract(value, 'QUANTITY', 'String') AS QUANTITY,
                    JSONExtract(value, 'PRODUCT_ID', 'String') AS PRODUCT_ID
                FROM default.data_streamFix

SELECT
    JSONExtract(value, 'ORDER_DATE', 'String') AS ORDER_DATE,
    JSONExtract(value, 'PURCHASER', 'String') AS PURCHASER,
    JSONExtract(value, 'QUANTITY', 'String') AS QUANTITY,
    JSONExtract(value, 'PRODUCT_ID', 'String') AS PRODUCT_ID
FROM default.data_streamFix

Query id: b273ae75-6cca-41e4-bdef-a1741fc43a85

┌─ORDER_DATE─┬─PURCHASER─┬─QUANTITY─┬─PRODUCT_ID─┐
│ 16816      │ 1001      │ 1        │ 102        │
│ 16817      │ 1002      │ 2        │ 105        │
│ 16850      │ 1002      │ 2        │ 106        │
│ 16852      │ 1003      │ 1        │ 107        │
└────────────┴───────────┴──────────┴────────────┘

4 rows in set. Elapsed: 3.516 sec. 

We can also create materialized views to store the data in columns.

Conclusion

Hopefully, this blog post helps to understand the configuration details to perform the data streaming from MySQL to ClickHouse using KsqlDB and Redpanda. Read the articles below to learn more about streaming in ClickHouse:

About Sri Sakthivel M.D. 6 Articles
Oracle certified MySQL DBA. Have expertise knowledge on the MySQL and its related technologies. Love to learn the Open source databases. Currently focusing on Clickhouse and its internals. Active MySQL Blogger and Youtuber.
Contact: Website