Introduction
As we mentioned in the previous article in this series, migrating data from OLTP to OLAP is possible. This tutorial shows you how to set up a Postgres Docker image for usage with Debezium to gather change data (CDC). Then, a Kafka topic is written containing the captured bulk data and modifications. Eventually, we’ll migrate all the data to the ClickHouse database using the ChistaDATA Python Connector.
Runbook to stream data from PostgreSQL to ClickHouse using Kafka & Debezium
Configuring PostgreSQL Database
Here are the required your existing PostgreSQL server settings:
- Verify that wal_level parameter is
logical - With PostgreSQL versions 10 and up, pgoutput is utilized, therefore no further plugins are needed.
- According to the instructions, you must create a Debezium user with the minimum necessary rights (REPLICATION and LOGIN permissions), and you also need other permissions in order to use pgoutput.
OR
PostgreSQL’s Debezium version is available. All of the above rules are applied by default in this version.
version: "3.9"
services:
postgres:
image: debezium/postgres:14
ports:
- 5432:5432
healthcheck:
test: "pg_isready -U debezium_user -d salary"
interval: 2s
timeout: 20s
retries: 10
environment:
- POSTGRES_USER=debezium_user
- POSTGRES_PASSWORD=debezium_pw
- POSTGRES_DB=salary
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:latest
user: root
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS= PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
ports:
- 9092:9092
- 9093:9093
depends_on: [zookeeper]
clickhouse:
image: clickhouse/clickhouse-server:latest
ports:
- 9000:9000
- 8123:8123
ulimits:
nproc: 65535
nofile:
soft: 262144
hard: 262144
depends_on: [zookeeper, kafka]
debezium:
image: debezium/connect:latest
ports:
- 8083:8083
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
depends_on: [zookeeper, kafka]
python:
image: ihsnlky/python:latest
depends_on: [clickhouse]
Let’s connect to the source database.
su - postgres psql -U debezium_user -d salary
Let’s create a table and insert some data on the PostgreSQL side. Upon your request, another update, deletion, or insert procedure may be used.
\c salary
CREATE TABLE IF NOT EXISTS employee
(
id bigint NOT NULL,
department character varying(255) NOT NULL,
employee_number character varying(255),
date_of_recruitment date,
CONSTRAINT employee_pkey PRIMARY KEY (id)
);
INSERT INTO employee VALUES (1, 'full time', '776E', now());
Configuring Debezium Container
Now that we have Debezium container up and running. The salary database will then be streamed changes by Debezium. By registering a connection, we accomplish this.
Enter this command in a terminal after logging into the Debezium container.
echo '{
"name": "shipments-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.server.name": "postgres",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "debezium_pw",
"database.dbname" : "salary",
"table.include.list": "public.employee",
"topic.prefix": "salary",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"snapshot.mode": "initial",
"tombstones.on.delete": "false",
"decimal.handling.mode": "double"
}
}' > debezium.json
Notice that the plugin.name is set to pgoutput. The remaining combinations can be understood by themselves.
curl -H 'Content-Type: application/json' debezium:8083/connectors --data "@debezium.json"
The salary db is consistently snapshotted when the connector has been initialized for the first time. Following the completion of that snapshot, the connector continues to record row-level inserts, updates, and deletions after bulk operations of database data.
If everything went according to plan up to this point, you ought to see a topic named salary.public.employee generated in Kafka topic.
Checking Kafka Container
To check Kafka topics and messages:
/usr/bin/kafka-topics --list --bootstrap-server kafka:9092 /usr/bin/kafka-console-consumer --bootstrap-server kafka:9092 --topic salary.public.employee --from-beginning
What do these Kafka messages make clear to us? As anticipated, the PostgreSQL operation semantics (for an insert, update, delete and bulk) correspond to the operation type (c, u, d and r). There is a “after” record for inserted, bulk and updated transactions that displays the values after the transaction was committed. There is a “before” record for deleted record that displays null for the “after” values. Also there is a “before” and “after” record for updated record that displays the values. A transaction id, ts_ms, database-specific sequence number, and lsn data are among the numerous meta-data that are present. If several events took place within the same transaction context, they can all share a single transaction id.
Checking ClickHouse Database
CREATE TABLE IF NOT EXISTS default.kafka_table
(
`id` UInt64,
`department` String,
`employee_number` String,
`date_of_recruitment` Date
)
ENGINE = ReplacingMergeTree
ORDER BY id;
As soon as the table is created, the data set will appear in ClickHouse database. Please make sure the Python container is running as soon as you create the table in ClickHouse.
Conclusion
Using the Debezium and ChistaDATA Python Conenctor services, we successfully built, set up, and operated a test CDC pipeline from PostgreSQL to a ClickHouse database in this blog.
To know more about streaming data in ClickHouse, do read the following articles
- Streaming Data from PostgreSQL to ClickHouse using Kafka and Debezium – Part 1
- Streaming Data from MySQL to ClickHouse using Redpanda and KsqlDB
- Real-time Streaming Bulk Data Loading from Kafka to ClickHouse
You might also like:
- From Snowflake to ClickHouse: How ChistaDATA Enabled the World’s Largest Ad Tech Platform’s Migration and Built an Optimal Real-Time Analytics Infrastructure
- Implementing Branching on ChistaDATA Cloud for ClickHouse
- Runbook for Migration from Snowflake to ChistaDATA Cloud for ClickHouse: Part 1
- Optimizing Data Processing with ClickHouse MergeTree on S3
- Migrating MSSQL Database to ClickHouse