Introduction
In a traditional way, data analysis is not quick and is not frequently performed. In the modern world, it is necessary to perform real time data analysis quickly and frequently to improve the business. Real time streaming data architecture provides quick data analysis. This blog post will explain how to perform the data streaming from MySQL to ClickHouse using the following components.
- MySQL
- Debezium connector (CDC)
- Redpanda
- KsqlDB
- ClickHouse
MySQL to ClickHouse Data Streaming Implementation
Docker Compose
To implement the setup, we have the pre-compiled docker compose file. The files can be downloaded from our git repository.
sakthivel@Sris-MacBook-Pro ~ % git clone https://github.com/ChistaDATA/mysql-clickhouse-redpanda-ksqldb-stream.git Cloning into 'mysql-clickhouse-redpanda-ksqldb-stream'... remote: Enumerating objects: 35, done. remote: Counting objects: 100% (35/35), done. remote: Compressing objects: 100% (34/34), done. remote: Total 35 (delta 0), reused 0 (delta 0), pack-reused 0 Receiving objects: 100% (35/35), 22.13 MiB | 821.00 KiB/s, done. sakthivel@Sris-MacBook-Pro ~ % ls mysql-clickhouse-redpanda-ksqldb-stream sakthivel@Sris-MacBook-Pro ~ % cd mysql-clickhouse-redpanda-ksqldb-stream/ sakthivel@Sris-MacBook-Pro ~ % ls -lrth total 8.0K drwxrwxr-x 2 sakthivel sakthivel 4.0K Dec 26 14:47 debezium-connector-clickhouse -rw-rw-r-- 1 sakthivel sakthivel 2.5K Dec 26 14:47 docker-compose.yaml
Once the files are cloned, we need to bring the components up using the docker-compose utility.
sakthivel@Sris-MacBook-Pro ~ % docker-compose -f docker-compose.yaml up [+] Running 7/7 ⠿ Network sakthivel_default Created 0.0s ⠿ Container chista-redpanda Created 0.1s ⠿ Container sakthivel-clickhouse-1 Created 0.1s ⠿ Container chista-mysql Created 0.1s ⠿ Container chista-connector Created 0.1s ⠿ Container chista-registry Created 0.1s ⠿ Container chista-ksqldb Created 0.1s Attaching to chista-connector, chista-ksqldb, chista-mysql, chista-redpanda, chista-registry, sakthivel-clickhouse-1
As we see above, the components are getting created. Let see the containers status using following command.
sakthivel@Sris-MacBook-Pro ~ % docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Names}}\t{{.Ports}}" NAMES STATUS NAMES PORTS chista-connector Up 4 hours chista-connector 0.0.0.0:8083->8083/tcp, 9092/tcp chista-registry Up 4 hours chista-registry 0.0.0.0:8081->8081/tcp chista-ksqldb Up 4 hours chista-ksqldb 0.0.0.0:8088->8088/tcp sakthivel-clickhouse-1 Up 4 hours sakthivel-clickhouse-1 9009/tcp, 0.0.0.0:9123->8123/tcp, 0.0.0.0:8002->9000/tcp chista-mysql Up 4 hours chista-mysql 0.0.0.0:3306->3306/tcp, 33060/tcp chista-redpanda Up 4 hours chista-redpanda 8081-8082/tcp, 0.0.0.0:9092->9092/tcp, 9644/tcp, 0.0.0.0:29092->29092/tcp
Now, we have all the required components running. The next step is configuring the components to perform the data streaming. We can see the configuration section with three parts.
- Configuration – Debezium connector
- Configuration – ksqlDB
- Configuration – ClickHouse
Configuring Debezium connector
The following data are automatically created when building the MySQL container. Our aim is to configure the data streaming for “inventory.orders” table.
sakthivel@Sris-MacBook-Pro ~ % docker exec -it chista-mysql /bin/bash root@dffe0397925d:/# root@dffe0397925d:/# mysql -uroot -p Enter password: mysql> use inventory Database changed mysql> show tables; +---------------------+ | Tables_in_inventory | +---------------------+ | addresses | | customers | | geom | | orders | | products | | products_on_hand | +---------------------+ 6 rows in set (0.00 sec) mysql> select * from orders; +--------------+------------+-----------+----------+------------+ | order_number | order_date | purchaser | quantity | product_id | +--------------+------------+-----------+----------+------------+ | 10001 | 2016-01-16 | 1001 | 1 | 102 | | 10002 | 2016-01-17 | 1002 | 2 | 105 | | 10003 | 2016-02-19 | 1002 | 2 | 106 | | 10004 | 2016-02-21 | 1003 | 1 | 107 | +--------------+------------+-----------+----------+------------+ 4 rows in set (0.01 sec)
The first step, we need to login the debezium container and copy all the connector related files to Kafka connect directory.
sakthivel@Sris-MacBook-Pro ~ % docker exec -it chista-connector /bin/bash [kafka@7fa37157caf8 ~]$ cd /kafka/manual/ [kafka@7fa37157caf8 manual]$ [kafka@7fa37157caf8 manual]$ cp -r * /kafka/connect/ [kafka@7fa37157caf8 manual]$ cd /kafka/connect/ [kafka@7fa37157caf8 connect]$ ls -lrth total 24M drwxr-xr-x 2 kafka kafka 4.0K Sep 6 02:19 debezium-connector-postgres drwxr-xr-x 2 kafka kafka 4.0K Sep 6 02:19 debezium-connector-mysql drwxr-xr-x 2 kafka kafka 4.0K Sep 6 02:19 debezium-connector-mongodb drwxr-xr-x 2 kafka kafka 4.0K Sep 6 02:19 debezium-connector-sqlserver drwxr-xr-x 2 kafka kafka 4.0K Sep 6 02:19 debezium-connector-vitess drwxr-xr-x 2 kafka kafka 4.0K Sep 6 02:19 debezium-connector-oracle drwxr-xr-x 2 kafka kafka 4.0K Sep 6 02:19 debezium-connector-db2 -rw-r--r-- 1 kafka kafka 210K Dec 26 04:56 checker-qual-3.5.0.jar -rw-r--r-- 1 kafka kafka 406K Dec 26 04:56 clickhouse-client-0.3.2.jar -rw-r--r-- 1 kafka kafka 37K Dec 26 04:56 clickhouse-http-client-0.3.2.jar -rw-r--r-- 1 kafka kafka 467K Dec 26 04:56 clickhouse-jdbc-0.3.2.jar -rw-r--r-- 1 kafka kafka 17K Dec 26 04:56 common-utils-6.0.0.jar ....
Next, we need to configure the debezium connector to capture the changes from MySQL.
sakthivel@Sris-MacBook-Pro ~ % curl --request POST --url http://localhost:8083/connectors --header 'Content-Type: application/json' --data '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "chista-mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "223344", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "redpanda:9092", "database.history.kafka.topic": "schema-changes.inventory" }}' {"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"chista-mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"223344","database.server.name":"dbserver1","database.include.list":"inventory","database.history.kafka.bootstrap.servers":"redpanda:9092","database.history.kafka.topic":"schema-changes.inventory","name":"inventory-connector"},"tasks":[],"type":"source"}% sakthivel@Sris-MacBook-Pro ~ %
Once the connector is configured, we can see the topics are getting created on red panda as well as ksqlDB.
redpanda:
sakthivel@Sris-MacBook-Pro ~ % docker exec -it chista-redpanda /bin/bash redpanda@redpanda:/$ redpanda@redpanda:/$ rpk topic list NAME PARTITIONS REPLICAS dbserver1.inventory.addresses 1 1 dbserver1.inventory.customers 1 1 dbserver1.inventory.geom 1 1 dbserver1.inventory.orders 1 1 dbserver1.inventory.products 1 1
ksqlDB:
sakthivel@Sris-MacBook-Pro ~ % docker exec -it chista-ksqldb /bin/bash [appuser@ksqldb-server ~]$ ksql ksql> show topics; Kafka Topic | Partitions | Partition Replicas ------------------------------------------------------------------------ dbserver1 | 1 | 1 dbserver1.inventory.addresses | 1 | 1 dbserver1.inventory.customers | 1 | 1 dbserver1.inventory.geom | 1 | 1 dbserver1.inventory.orders | 1 | 1 dbserver1.inventory.products | 1 | 1 dbserver1.inventory.products_on_hand | 1 | 1 default_ksql_processing_log | 1 | 1 inventory.configs | 1 | 1 inventory.offset | 25 | 1 inventory.status | 5 | 1 schema-changes.inventory | 1 | 1 ------------------------------------------------------------------------
Now we can process the topic “dbserver1.inventory.orders” on ksqlDB to get the required data.
Configuring ksqlDB
At ksqlDB, We can use the PRINT command to view the orders topic. But, it is quite verbose and difficult to read, as shown below.
ksql> print 'dbserver1.inventory.orders' from beginning; Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 2022/12/26 05:05:34.074 Z, key: {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"order_number"}],"optional":false,"name":"dbserver1.inventory.orders.Key"},"payload":{"order_number":10001}}, value: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"order_number"},{"type":"int32","optional":false,"name":"io.debezium.time.Date","version":1,"field":"order_date"},{"type":"int32","optional":false,"field":"purchaser"},{"type":"int32","optional":false,"field":"quantity"},{"type":"int32","optional":false,"field":"product_id"}],"optional":true,"name":"dbserver1.inventory.orders.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"order_number"},{"type":"int32","optional":false,"name":"io.debezium.time.Date","version":1,"field":"order_date"},{"type":"int32","optional":false,"field":"purchaser"},{"type":"int32","optional":false,"field":"quantity"},{"type":"int32","optional":false,"field":"product_id"}],"optional":true,"name":"dbserver1.inventory.orders.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.orders.Envelope"},"payload":{"before":null,"after":{"order_number":10001,"order_date":16816,"purchaser":1001,"quantity":1,"product_id":102},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1672031133198,"snapshot":"true","db":"inventory","sequence":null,"table":"orders","server_id":0,"gtid":null,"file":"mysql-bin.000004","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1672031133203,"transaction":null}}, partition: 0
So, we need to extract the information from this entire topic. Firstly, we will create a STREAM in ksqlDB to extract the payload values.
ksql> CREATE STREAM Payload_Store_Before_After_Orders ( payload STRUCT< > "before" STRUCT< > "order_number" INT, > "order_date" DATE, > "purchaser" INT, > "quantity" INT, > "product_id" INT>, > after STRUCT< > "order_number" INT, > "order_date" DATE, > "purchaser" INT, > "quantity" INT, > "product_id" INT > >>) > WITH (KAFKA_TOPIC='dbserver1.inventory.orders', > VALUE_FORMAT='JSON',PARTITIONS=1, REPLICAS=1); Message ---------------- Stream created ----------------
Let’s query the payload stream.
ksql> select * from PAYLOAD_STORE_BEFORE_AFTER_ORDERS; +---------------------------------------------------------------------------------------------------------------------------+ |PAYLOAD | +---------------------------------------------------------------------------------------------------------------------------+ |{before=null, AFTER={order_number=10001, order_date=16816, purchaser=1001, quantity=1, product_id=102}} | |{before=null, AFTER={order_number=10002, order_date=16817, purchaser=1002, quantity=2, product_id=105}} | |{before=null, AFTER={order_number=10003, order_date=16850, purchaser=1002, quantity=2, product_id=106}} | |{before=null, AFTER={order_number=10004, order_date=16852, purchaser=1003, quantity=1, product_id=107}} | Query Completed
As you see in the above output, it has the after and before values from the payload. Now, we are going to extract the only after value. For that, we need to create another stream.
ksql> create STREAM After_Orders as select payload->"AFTER" from Payload_Store_Before_After_Orders emit changes; Message ------------------------------------------- Created query with ID CSAS_AFTER_ORDERS_6
Let’s query the stream.
ksql> select * from After_Orders; +---------------------------------------------------------------------------------------------------------------------------+ |AFTER | +---------------------------------------------------------------------------------------------------------------------------+ |{order_number=10001, order_date=16816, purchaser=1001, quantity=1, product_id=102} | |{order_number=10002, order_date=16817, purchaser=1002, quantity=2, product_id=105} | |{order_number=10003, order_date=16850, purchaser=1002, quantity=2, product_id=106} | |{order_number=10004, order_date=16852, purchaser=1003, quantity=1, product_id=107} | Query Completed
Now we have the after value, but still, we need individual columns. To separate this into individual columns, we can create one more stream, as shown below.
ksql> create STREAM Columns_With_Changes as select AFTER->"order_number" as order_number, AFTER->"order_date" as order_date, AFTER->"purchaser" as purchaser, AFTER->"quantity" as quantity, AFTER->"product_id" as product_id from After_Orders emit changes; Message ---------------------------------------------------- Created query with ID CSAS_COLUMNS_WITH_CHANGES_26 ---------------------------------------------------- ksql> ksql> select * from Columns_With_Changes; +-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+ |ORDER_NUMBER |ORDER_DATE |PURCHASER |QUANTITY |PRODUCT_ID | +-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+ |10001 |2016-01-16 |1001 |1 |102 | |10002 |2016-01-17 |1002 |2 |105 | |10003 |2016-02-19 |1002 |2 |106 | |10004 |2016-02-21 |1003 |1 |107 | Query Completed
Now we can see the MySQL data ( table: orders ) in ksqlDB with formatted.
Configuring ClickHouse
As a final part, we have to sync the data from ksqlDB to ClickHouse. This can be achieved in different ways.
- Using Sink connector
- Using the ClickHouse Kafka engine
By using the Sink connector, we have to create the connector on ksqlDB and need to create the respective table in ClickHouse.
By using ClickHouse Kafka engine, we need to create the table on ClickHouse with Kafka engine as shown below.
727b5523b75a :) CREATE TABLE default.data_streamFix ( `value` String ) ENGINE = Kafka SETTINGS kafka_broker_list = 'redpanda:9092', kafka_topic_list = 'COLUMNS_WITH_CHANGES', kafka_group_name = 'groupFix', kafka_format = 'JSONAsString', kafka_row_delimiter = '\n', input_format_import_nested_json = 1 Ok. 0 rows in set. Elapsed: 0.026 sec.
Once the table is created, the data will be streamed from ksqlDB stream to Kafka table. For this, we need to enable the following value.
727b5523b75a :) set stream_like_engine_allow_direct_select=1; SET stream_like_engine_allow_direct_select = 1 Query id: 2be27ac3-d0f5-4e29-bc48-ca04f16dd6fc Ok. 0 rows in set. Elapsed: 0.001 sec.
Here, I used the JSONAsString format to copy the entire value into table. Let’s query the Kafka engine table.
727b5523b75a :) select * from default.data_streamFix; SELECT * FROM default.data_streamFix Query id: 56ecef4e-a4d9-466c-a2e8-b6e7e9ddcaba ┌─value───────────────────────────────────────────────────────────────┐ │ {"ORDER_DATE":16816,"PURCHASER":1001,"QUANTITY":1,"PRODUCT_ID":102} │ │ {"ORDER_DATE":16817,"PURCHASER":1002,"QUANTITY":2,"PRODUCT_ID":105} │ │ {"ORDER_DATE":16850,"PURCHASER":1002,"QUANTITY":2,"PRODUCT_ID":106} │ │ {"ORDER_DATE":16852,"PURCHASER":1003,"QUANTITY":1,"PRODUCT_ID":107} │ └─────────────────────────────────────────────────────────────────────┘ 4 rows in set. Elapsed: 3.512 sec.
We can use the JSONExtract functions to separate the values into column.
For example,
727b5523b75a :) SELECT JSONExtract(value, 'ORDER_DATE', 'String') AS ORDER_DATE, JSONExtract(value, 'PURCHASER', 'String') AS PURCHASER, JSONExtract(value, 'QUANTITY', 'String') AS QUANTITY, JSONExtract(value, 'PRODUCT_ID', 'String') AS PRODUCT_ID FROM default.data_streamFix SELECT JSONExtract(value, 'ORDER_DATE', 'String') AS ORDER_DATE, JSONExtract(value, 'PURCHASER', 'String') AS PURCHASER, JSONExtract(value, 'QUANTITY', 'String') AS QUANTITY, JSONExtract(value, 'PRODUCT_ID', 'String') AS PRODUCT_ID FROM default.data_streamFix Query id: b273ae75-6cca-41e4-bdef-a1741fc43a85 ┌─ORDER_DATE─┬─PURCHASER─┬─QUANTITY─┬─PRODUCT_ID─┐ │ 16816 │ 1001 │ 1 │ 102 │ │ 16817 │ 1002 │ 2 │ 105 │ │ 16850 │ 1002 │ 2 │ 106 │ │ 16852 │ 1003 │ 1 │ 107 │ └────────────┴───────────┴──────────┴────────────┘ 4 rows in set. Elapsed: 3.516 sec.
We can also create materialized views to store the data in columns.
Conclusion
Hopefully, this blog post helps to understand the configuration details to perform the data streaming from MySQL to ClickHouse using KsqlDB and Redpanda. Read the articles below to learn more about streaming in ClickHouse:
- MySQL to ClickHouse Replication with Sink Connector
- Integrating Kafka with ClickHouse for Real-time Streaming
- How to Ingest Data from Kafka Topic in ClickHouse
- Streaming Data from PostgreSQL to ClickHouse using Kafka and Debezium: Part 1
- Streaming Data from PostgreSQL to ClickHouse using Kafka and Debezium: Part 2