Introduction
As we mentioned in the previous article in this series, migrating data from OLTP to OLAP is possible. This tutorial shows you how to set up a Postgres Docker image for usage with Debezium to gather change data (CDC). Then, a Kafka topic is written containing the captured bulk data and modifications. Eventually, we’ll migrate all the data to the ClickHouse database using the ChistaDATA Python Connector.
Runbook to stream data from PostgreSQL to ClickHouse using Kafka & Debezium
Configuring PostgreSQL Database
Here are the required your existing PostgreSQL server settings:
- Verify that wal_level parameter is
logical
- With PostgreSQL versions 10 and up, pgoutput is utilized, therefore no further plugins are needed.
- According to the instructions, you must create a Debezium user with the minimum necessary rights (REPLICATION and LOGIN permissions), and you also need other permissions in order to use pgoutput.
OR
PostgreSQL’s Debezium version is available. All of the above rules are applied by default in this version.
version: "3.9" services: postgres: image: debezium/postgres:14 ports: - 5432:5432 healthcheck: test: "pg_isready -U debezium_user -d salary" interval: 2s timeout: 20s retries: 10 environment: - POSTGRES_USER=debezium_user - POSTGRES_PASSWORD=debezium_pw - POSTGRES_DB=salary zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: - 2181:2181 kafka: image: confluentinc/cp-kafka:latest user: root environment: - KAFKA_BROKER_ID=1 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS= PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 ports: - 9092:9092 - 9093:9093 depends_on: [zookeeper] clickhouse: image: clickhouse/clickhouse-server:latest ports: - 9000:9000 - 8123:8123 ulimits: nproc: 65535 nofile: soft: 262144 hard: 262144 depends_on: [zookeeper, kafka] debezium: image: debezium/connect:latest ports: - 8083:8083 environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses depends_on: [zookeeper, kafka] python: image: ihsnlky/python:latest depends_on: [clickhouse]
Let’s connect to the source database.
su - postgres psql -U debezium_user -d salary
Let’s create a table and insert some data on the PostgreSQL side. Upon your request, another update, deletion, or insert procedure may be used.
\c salary CREATE TABLE IF NOT EXISTS employee ( id bigint NOT NULL, department character varying(255) NOT NULL, employee_number character varying(255), date_of_recruitment date, CONSTRAINT employee_pkey PRIMARY KEY (id) ); INSERT INTO employee VALUES (1, 'full time', '776E', now());
Configuring Debezium Container
Now that we have Debezium container up and running. The salary
database will then be streamed changes by Debezium. By registering a connection, we accomplish this.
Enter this command in a terminal after logging into the Debezium container.
echo '{ "name": "shipments-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "postgres", "database.server.name": "postgres", "database.port": "5432", "database.user": "debezium_user", "database.password": "debezium_pw", "database.dbname" : "salary", "table.include.list": "public.employee", "topic.prefix": "salary", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "snapshot.mode": "initial", "tombstones.on.delete": "false", "decimal.handling.mode": "double" } }' > debezium.json
Notice that the plugin.name is set to pgoutput
. The remaining combinations can be understood by themselves.
curl -H 'Content-Type: application/json' debezium:8083/connectors --data "@debezium.json"
The salary db is consistently snapshotted when the connector has been initialized for the first time. Following the completion of that snapshot, the connector continues to record row-level inserts, updates, and deletions after bulk operations of database data.
If everything went according to plan up to this point, you ought to see a topic named salary.public.employee
generated in Kafka topic.
Checking Kafka Container
To check Kafka topics and messages:
/usr/bin/kafka-topics --list --bootstrap-server kafka:9092 /usr/bin/kafka-console-consumer --bootstrap-server kafka:9092 --topic salary.public.employee --from-beginning
What do these Kafka messages make clear to us? As anticipated, the PostgreSQL operation semantics (for an insert, update, delete and bulk) correspond to the operation type (c, u, d and r). There is a “after” record for inserted, bulk and updated transactions that displays the values after the transaction was committed. There is a “before” record for deleted record that displays null for the “after” values. Also there is a “before” and “after” record for updated record that displays the values. A transaction id, ts_ms, database-specific sequence number, and lsn data are among the numerous meta-data that are present. If several events took place within the same transaction context, they can all share a single transaction id.
Checking ClickHouse Database
CREATE TABLE IF NOT EXISTS default.kafka_table ( `id` UInt64, `department` String, `employee_number` String, `date_of_recruitment` Date ) ENGINE = ReplacingMergeTree ORDER BY id;
As soon as the table is created, the data set will appear in ClickHouse database. Please make sure the Python container is running as soon as you create the table in ClickHouse.
Conclusion
Using the Debezium and ChistaDATA Python Conenctor services, we successfully built, set up, and operated a test CDC pipeline from PostgreSQL to a ClickHouse database in this blog.
To know more about streaming data in ClickHouse, do read the following articles