Migrating MSSQL Database to ClickHouse

In the previous article, We discussed migrating from Oracle to the ClickHouse database. This migration technology tool will soon be available in the ChistaDATA DBaaS, and you can enjoy the advantages of ClickHouse database with only a single click. Today we’ll show you how to migrate from a Microsoft SQL Server database to Kafka, and then we’ll talk about migrating to the ClickHouse database. We will also enhance this demonstration process with the Kafdrop UI tool.

Why choose ChistaDATA DBaaS for migration?

  • Serverless header connector
    Simply authenticate your OLTP database and ClickHouse, and your ClickHouse database data integration will automatically adapt to schema and API changes on ChistaDATA DBaaS environment.
  • No more security compliance issues.
    To test your data pipeline without relying on third-party services, use ChistaDATA DBaaS’ open-source edition. Your security team will love it.
  • Being open source, it is extensible.
    With ChistaDATA DBaaS you can easily customise the open source OLTP database connector to your specific needs. All connectors are freely available.
  • Monitoring & Alerts on your terms
    Sometimes delays happen. ChistaDATA logs everything and let you know when problems occur. Get alerts the way you want.
  • Normalized schemas
    Users can choose raw data, analysts can choose normalized schemas. ChistaDATA DBaaS offers several options you can use.

Let’s demonstrate the migration from SQL Server 2019 to ClickHouse

We need to have an MSSQL database system up and running to get started. One of the easiest ways to do this is to use Docker to deploy a container that runs the SQL Server database. We also added the Kafdrop docker image to manage and see what happens on the Kafka side.

The prerequisites

  • Also let’s take a closer look at the setup before registering the connector with Kafka Connect. In this example, we will utilize the following configuration. Let’s download sqlserver.json file next to the docker-compose.yaml
{
    "name": "sqlserver-connector",
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max": "1",
        "database.hostname": "sqlserver-sqlserver-1",
        "database.port": "1433",
        "database.user": "sa",
        "database.password": "Password!",
        "database.names": "TestDB",
        "database.encrypt": "false",
        "database.server.name": "sqlserver-sqlserver-1",
        "table.whitelist": "Inventory",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "topic.prefix": "dbz",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "include.schema.changes": "true",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.Inventory"
    }
}

Here is the our docker-compose.yaml file and now we can up all the containers with docker-compose up -d

version: "3.9"
services:
  sqlserver:
    image: mcr.microsoft.com/mssql/server:2019-latest
    ports:
     - 1433:1433
    environment:
     - ACCEPT_EULA=Y
     - MSSQL_PID=Standard
     - SA_PASSWORD=Password!
     - MSSQL_AGENT_ENABLED=true
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - 2181:2181
  kafka:
    image: confluentinc/cp-kafka:latest
    user: root
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS= PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    ports:
      - 9092:9092
      - 9093:9093
    depends_on: [zookeeper]
  kafka_connect:
    image: debezium/connect:latest
    ports:
      - 8083:8083
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    volumes:
      - ./sqlserver.json:/kafka/sqlserver.json
    depends_on: [zookeeper, kafka]
  kafdrop:
        container_name: kafdrop
        environment:
            KAFKA_BROKERCONNECT: kafka:9092
        image: obsidiandynamics/kafdrop
        ports:
            - 9000:9000

When a new container starts with no initial setup or database, the database is configured and deployed. After the MSSQL database server signals that the container has started, client applications can connect to the database. Firstly, we need to connect the MSSQL container and apply these rows row by row to create and table and some data. Then we run the following command to enable the Change Data Capture feature on the MSSQL database and Inventory table.

/opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P Password!
CREATE DATABASE TestDB;
SELECT Name from sys.databases;
GO
USE TestDB;
CREATE TABLE Inventory (id INT, name NVARCHAR(50), quantity INT);
INSERT INTO Inventory VALUES (1, 'banana', 150); INSERT INTO Inventory VALUES (2, 'orange', 154);
GO

USE TestDB;
EXEC sp_changedbowner 'sa'
GO
EXEC sys.sp_cdc_enable_db
GO

USE TestDB;
GO 
EXEC sys.sp_cdc_enable_table 
@source_schema = N'dbo', 
@source_name = Inventory, 
@role_name = NULL, 
@supports_net_changes = 0
GO

After providing all these steps, we should see these rows. When you run the query, you will see that two jobs have been created. One collects the data, and the other deletes the records before a certain date.

Job 'cdc.DebeziumTest_capture' started successfully.
Job 'cdc.DebeziumTest_cleanup' started successfully.

We’re done with the MSSQL side. Just need to deploy Kafka Connect. The above configuration is stored in a file called sqlserver.json in the /kafka directory.

docker exec -it desktop-kafka_connect-1 /bin/bash

Now let’s request the Debezium API using the JSON file we created above.

curl -H 'Content-Type: application/json' kafka_connect:8083/connectors --data "@sqlserver.json"

Now we can connect to Kafka and see the messages.

docker exec -it desktop-kafka-1 /bin/bash

By using the Kafka console consumer tool and reading the contents of the topic to the local terminal, we can ensure that the data exists in Kafka:

/usr/bin/kafka-topics --list  --bootstrap-server kafka:9092
/usr/bin/kafka-console-consumer  --bootstrap-server kafka:9092  --topic dbz.TestDB.dbo.Inventory --from-beginning

Here is a sample of topic messages:

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"struct",
            "fields":[
               {
                  "type":"int32",
                  "optional":true,
                  "field":"id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"name"
               },
               {
                  "type":"int32",
                  "optional":true,
                  "field":"quantity"
               }
            ],
            "optional":true,
            "name":"dbz.TestDB.dbo.Inventory.Value",
            "field":"before"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"int32",
                  "optional":true,
                  "field":"id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"name"
               },
               {
                  "type":"int32",
                  "optional":true,
                  "field":"quantity"
               }
            ],
            "optional":true,
            "name":"dbz.TestDB.dbo.Inventory.Value",
            "field":"after"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_ms"
               },
               {
                  "type":"string",
                  "optional":true,
                  "name":"io.debezium.data.Enum",
                  "version":1,
                  "parameters":{
                     "allowed":"true,last,false,incremental"
                  },
                  "default":"false",
                  "field":"snapshot"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"sequence"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"schema"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"table"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"change_lsn"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"commit_lsn"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"event_serial_no"
               }
            ],
            "optional":false,
            "name":"io.debezium.connector.sqlserver.Source",
            "field":"source"
         },
         {
            "type":"string",
            "optional":false,
            "field":"op"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"total_order"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"data_collection_order"
               }
            ],
            "optional":true,
            "name":"event.block",
            "version":1,
            "field":"transaction"
         }
      ],
      "optional":false,
      "name":"dbz.TestDB.dbo.Inventory.Envelope",
      "version":1
   },
   "payload":{
      "before":null,
      "after":{
         "id":2,
         "name":"orange",
         "quantity":154
      },
      "source":{
         "version":"2.1.2.Final",
         "connector":"sqlserver",
         "name":"dbz",
         "ts_ms":1686307144147,
         "snapshot":"last",
         "db":"TestDB",
         "sequence":null,
         "schema":"dbo",
         "table":"Inventory",
         "change_lsn":null,
         "commit_lsn":"00000025:00000438:0003",
         "event_serial_no":null
      },
      "op":"r",
      "ts_ms":1686307144161,
      "transaction":null
   }
}

After migrating our data to Kafka, we can use ChistaDATA Python Connector to complete the migration process. ChistaDATA Python Connector, which enables migration from Kafka to ClickHouse, is the most robust tool on the market regarding restart issues and supported data types. The most recent data sent to ClickHouse is known to the ChistaDATA Python Connector. As a result, you can be sure that if your machine needs to be restarted, the migration will pick up where it left off.

To use this connector, you can examine this Python container in the docker-compose file mentioned earlier and apply for SQL Server 2019 database schema information.

Kafdrop

Kafdrop does an excellent job of bridging obvious holes in Kafka’s observability tools, resolving issues the community has highlighted for far too long. Kafdrop, like Apache Kafka, is an Apache 2.0 licensed project. As a result, it will not cost you anything. More information is available from there.

Let’s navigate to http://localhost:9000 and see what’s happening on Kafka. A connector called sqlserver-connector should be migrating your data to the topic. We should be able to see these topics created by debezium in the Kafdrop UI like “dbz.TestDB.dbo.Inventory”

Go to the “dbz.TestDB.dbo.Inventory” topic. This topic is used to track the Inventory table from the TestDB. All of the above topic messages in here.

An important note

This SQL Server 2019 and Kafdrop image will only work on the linux/amd64 architecture platform. Otherwise, it may not work. Just to make sure you’re on that. To check this, you can type uname -m in a terminal.

Conclusion

Debezium is a fantastic tool for capturing row-level changes on a database and streaming those changes to a broker of our choosing. Moreover, the ChistaDATA Python Connector is a powerful tool for applying messages to ClickHouse; we have demonstrated this already with Kafdrop. Also, it will be possible to migrate from MySQL/MariaDB/PostgreSQL/OracleDB/MongoDB/MSSQL/Snowflake to ChistaDATA DBaaS very soon. Stay tuned.

References

https://learn.microsoft.com/en-us/sql/linux/quickstart-install-connect-docker?view=sql-server-ver16&pivots=cs1-bash

About Ilkay 23 Articles
Ilkay has been administering databases including NoSQL and RDBMS for over 7 years. He is experienced working high-scale banking databases. He is currently working at ChistaDATA Inc. as Database Administrator.
Contact: Website