In the previous article, We discussed migrating from Oracle to the ClickHouse database. This migration technology tool will soon be available in the ChistaDATA DBaaS, and you can enjoy the advantages of ClickHouse database with only a single click. Today we’ll show you how to migrate from a Microsoft SQL Server database to Kafka, and then we’ll talk about migrating to the ClickHouse database. We will also enhance this demonstration process with the Kafdrop
UI tool.
Why choose ChistaDATA DBaaS for migration?
- Serverless header connector
Simply authenticate your OLTP database and ClickHouse, and your ClickHouse database data integration will automatically adapt to schema and API changes on ChistaDATA DBaaS environment. - No more security compliance issues.
To test your data pipeline without relying on third-party services, use ChistaDATA DBaaS’ open-source edition. Your security team will love it. - Being open source, it is extensible.
With ChistaDATA DBaaS you can easily customise the open source OLTP database connector to your specific needs. All connectors are freely available. - Monitoring & Alerts on your terms
Sometimes delays happen. ChistaDATA logs everything and let you know when problems occur. Get alerts the way you want. - Normalized schemas
Users can choose raw data, analysts can choose normalized schemas. ChistaDATA DBaaS offers several options you can use.
Let’s demonstrate the migration from SQL Server 2019 to ClickHouse
We need to have an MSSQL database system up and running to get started. One of the easiest ways to do this is to use Docker to deploy a container that runs the SQL Server database. We also added the Kafdrop docker image to manage and see what happens on the Kafka side.
The prerequisites
- Also let’s take a closer look at the setup before registering the connector with Kafka Connect. In this example, we will utilize the following configuration. Let’s download
sqlserver.json
file next to the docker-compose.yaml
{ "name": "sqlserver-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "tasks.max": "1", "database.hostname": "sqlserver-sqlserver-1", "database.port": "1433", "database.user": "sa", "database.password": "Password!", "database.names": "TestDB", "database.encrypt": "false", "database.server.name": "sqlserver-sqlserver-1", "table.whitelist": "Inventory", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "topic.prefix": "dbz", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "include.schema.changes": "true", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.Inventory" } }
Here is the our docker-compose.yaml file and now we can up all the containers with docker-compose up -d
version: "3.9" services: sqlserver: image: mcr.microsoft.com/mssql/server:2019-latest ports: - 1433:1433 environment: - ACCEPT_EULA=Y - MSSQL_PID=Standard - SA_PASSWORD=Password! - MSSQL_AGENT_ENABLED=true zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest user: root environment: - KAFKA_BROKER_ID=1 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS= PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 ports: - 9092:9092 - 9093:9093 depends_on: [zookeeper] kafka_connect: image: debezium/connect:latest ports: - 8083:8083 environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses volumes: - ./sqlserver.json:/kafka/sqlserver.json depends_on: [zookeeper, kafka] kafdrop: container_name: kafdrop environment: KAFKA_BROKERCONNECT: kafka:9092 image: obsidiandynamics/kafdrop ports: - 9000:9000
When a new container starts with no initial setup or database, the database is configured and deployed. After the MSSQL database server signals that the container has started, client applications can connect to the database. Firstly, we need to connect the MSSQL container and apply these rows row by row to create and table and some data. Then we run the following command to enable the Change Data Capture feature on the MSSQL database and Inventory table.
/opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P Password! CREATE DATABASE TestDB; SELECT Name from sys.databases; GO USE TestDB; CREATE TABLE Inventory (id INT, name NVARCHAR(50), quantity INT); INSERT INTO Inventory VALUES (1, 'banana', 150); INSERT INTO Inventory VALUES (2, 'orange', 154); GO USE TestDB; EXEC sp_changedbowner 'sa' GO EXEC sys.sp_cdc_enable_db GO USE TestDB; GO EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = Inventory, @role_name = NULL, @supports_net_changes = 0 GO
After providing all these steps, we should see these rows. When you run the query, you will see that two jobs have been created. One collects the data, and the other deletes the records before a certain date.
Job 'cdc.DebeziumTest_capture' started successfully. Job 'cdc.DebeziumTest_cleanup' started successfully.
We’re done with the MSSQL side. Just need to deploy Kafka Connect. The above configuration is stored in a file called sqlserver.json in the /kafka directory.
docker exec -it desktop-kafka_connect-1 /bin/bash
Now let’s request the Debezium API using the JSON file we created above.
curl -H 'Content-Type: application/json' kafka_connect:8083/connectors --data "@sqlserver.json"
Now we can connect to Kafka and see the messages.
docker exec -it desktop-kafka-1 /bin/bash
By using the Kafka console consumer tool and reading the contents of the topic to the local terminal, we can ensure that the data exists in Kafka:
/usr/bin/kafka-topics --list --bootstrap-server kafka:9092 /usr/bin/kafka-console-consumer --bootstrap-server kafka:9092 --topic dbz.TestDB.dbo.Inventory --from-beginning
Here is a sample of topic messages:
{ "schema":{ "type":"struct", "fields":[ { "type":"struct", "fields":[ { "type":"int32", "optional":true, "field":"id" }, { "type":"string", "optional":true, "field":"name" }, { "type":"int32", "optional":true, "field":"quantity" } ], "optional":true, "name":"dbz.TestDB.dbo.Inventory.Value", "field":"before" }, { "type":"struct", "fields":[ { "type":"int32", "optional":true, "field":"id" }, { "type":"string", "optional":true, "field":"name" }, { "type":"int32", "optional":true, "field":"quantity" } ], "optional":true, "name":"dbz.TestDB.dbo.Inventory.Value", "field":"after" }, { "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"version" }, { "type":"string", "optional":false, "field":"connector" }, { "type":"string", "optional":false, "field":"name" }, { "type":"int64", "optional":false, "field":"ts_ms" }, { "type":"string", "optional":true, "name":"io.debezium.data.Enum", "version":1, "parameters":{ "allowed":"true,last,false,incremental" }, "default":"false", "field":"snapshot" }, { "type":"string", "optional":false, "field":"db" }, { "type":"string", "optional":true, "field":"sequence" }, { "type":"string", "optional":false, "field":"schema" }, { "type":"string", "optional":false, "field":"table" }, { "type":"string", "optional":true, "field":"change_lsn" }, { "type":"string", "optional":true, "field":"commit_lsn" }, { "type":"int64", "optional":true, "field":"event_serial_no" } ], "optional":false, "name":"io.debezium.connector.sqlserver.Source", "field":"source" }, { "type":"string", "optional":false, "field":"op" }, { "type":"int64", "optional":true, "field":"ts_ms" }, { "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"id" }, { "type":"int64", "optional":false, "field":"total_order" }, { "type":"int64", "optional":false, "field":"data_collection_order" } ], "optional":true, "name":"event.block", "version":1, "field":"transaction" } ], "optional":false, "name":"dbz.TestDB.dbo.Inventory.Envelope", "version":1 }, "payload":{ "before":null, "after":{ "id":2, "name":"orange", "quantity":154 }, "source":{ "version":"2.1.2.Final", "connector":"sqlserver", "name":"dbz", "ts_ms":1686307144147, "snapshot":"last", "db":"TestDB", "sequence":null, "schema":"dbo", "table":"Inventory", "change_lsn":null, "commit_lsn":"00000025:00000438:0003", "event_serial_no":null }, "op":"r", "ts_ms":1686307144161, "transaction":null } }
After migrating our data to Kafka, we can use ChistaDATA Python Connector to complete the migration process. ChistaDATA Python Connector, which enables migration from Kafka to ClickHouse, is the most robust tool on the market regarding restart issues and supported data types. The most recent data sent to ClickHouse is known to the ChistaDATA Python Connector. As a result, you can be sure that if your machine needs to be restarted, the migration will pick up where it left off.
To use this connector, you can examine this Python container in the docker-compose file mentioned earlier and apply for SQL Server 2019 database schema information.
Kafdrop
Kafdrop does an excellent job of bridging obvious holes in Kafka’s observability tools, resolving issues the community has highlighted for far too long. Kafdrop, like Apache Kafka, is an Apache 2.0 licensed project. As a result, it will not cost you anything. More information is available from there.
Let’s navigate to http://localhost:9000
and see what’s happening on Kafka. A connector called sqlserver-connector
should be migrating your data to the topic. We should be able to see these topics created by debezium in the Kafdrop UI like “dbz.TestDB.dbo.Inventory”
Go to the “dbz.TestDB.dbo.Inventory” topic. This topic is used to track the Inventory table from the TestDB. All of the above topic messages in here.
An important note
This SQL Server 2019 and Kafdrop image will only work on the linux/amd64
architecture platform. Otherwise, it may not work. Just to make sure you’re on that. To check this, you can type uname -m
in a terminal.
Conclusion
Debezium is a fantastic tool for capturing row-level changes on a database and streaming those changes to a broker of our choosing. Moreover, the ChistaDATA Python Connector is a powerful tool for applying messages to ClickHouse; we have demonstrated this already with Kafdrop. Also, it will be possible to migrate from MySQL/MariaDB/PostgreSQL/OracleDB/MongoDB/MSSQL/Snowflake to ChistaDATA DBaaS very soon. Stay tuned.
References
https://learn.microsoft.com/en-us/sql/linux/quickstart-install-connect-docker?view=sql-server-ver16&pivots=cs1-bash
To know more about migrating to ClickHouse, do visit the following articles:
- Runbook for Migration from Oracle to ChistaDATA’s ClickHouse
- Runbook for Migration from Hadoop to ChistaDATA’s ClickHouse
- Runbook for Migration from Amazon Redshift to ChistaDATA’s ClickHouse DBaaS: Part 1