Streaming Data from PostgreSQL to ClickHouse using Kafka and Debezium – Part 2

Introduction

As we mentioned in the previous article in this series, migrating data from OLTP to OLAP is possible. This tutorial shows you how to set up a Postgres Docker image for usage with Debezium to gather change data (CDC). Then, a Kafka topic is written containing the captured bulk data and modifications. Eventually, we’ll migrate all the data to the ClickHouse database using the ChistaDATA Python Connector.

Runbook to stream data from PostgreSQL to ClickHouse using Kafka & Debezium

Configuring PostgreSQL Database

  1. Verify that wal_level parameter is logical
  2. With PostgreSQL versions 10 and up, pgoutput is utilized, therefore no further plugins are needed.
  3. According to the instructions, you must create a Debezium user with the minimum necessary rights (REPLICATION and LOGIN permissions), and you also need other permissions in order to use pgoutput.

OR

PostgreSQL’s Debezium version is available. All of the above rules are applied by default in this version.

version: "3.9"
services:
  postgres:
    image: debezium/postgres:14
    ports:
      - 5432:5432
    healthcheck:
      test: "pg_isready -U debezium_user -d salary"
      interval: 2s
      timeout: 20s
      retries: 10
    environment:
      - POSTGRES_USER=debezium_user
      - POSTGRES_PASSWORD=debezium_pw
      - POSTGRES_DB=salary
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - 2181:2181
  kafka:
    image: confluentinc/cp-kafka:latest
    user: root
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS= PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    ports:
      - 9092:9092
      - 9093:9093
    depends_on: [zookeeper]
  clickhouse:
    image: clickhouse/clickhouse-server:latest
    ports:
      - 9000:9000
      - 8123:8123
    ulimits:
      nproc: 65535
      nofile:
        soft: 262144
        hard: 262144
    depends_on: [zookeeper, kafka]
  debezium:
    image: debezium/connect:latest
    ports:
      - 8083:8083
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    depends_on: [zookeeper, kafka]
  python:
    image: ihsnlky/python:latest
    depends_on: [clickhouse]

Let’s connect to the source database.

su - postgres
psql -U debezium_user -d salary

Let’s create a table and insert some data on the PostgreSQL side. Upon your request, another update, deletion, or insert procedure may be used.

\c salary

CREATE TABLE IF NOT EXISTS employee
(
    id bigint NOT NULL,
    department character varying(255) NOT NULL,
    employee_number character varying(255),
    date_of_recruitment date,
    CONSTRAINT employee_pkey PRIMARY KEY (id)
);

INSERT INTO employee VALUES (1, 'full time', '776E', now());

Configuring Debezium Container

Enter this command in a terminal after logging into the Debezium container.

echo '{
  "name": "shipments-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "plugin.name": "pgoutput",
    "database.hostname": "postgres", 
    "database.server.name": "postgres", 
    "database.port": "5432", 
    "database.user": "debezium_user", 
    "database.password": "debezium_pw", 
    "database.dbname" : "salary", 
    "table.include.list": "public.employee",
    "topic.prefix": "salary",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",  
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "snapshot.mode": "initial",
    "tombstones.on.delete": "false",
    "decimal.handling.mode": "double"
    }
}' > debezium.json

Notice that the plugin.name is set to pgoutput. The remaining combinations can be understood by themselves.

curl -H 'Content-Type: application/json' debezium:8083/connectors --data "@debezium.json"

The salary db is consistently snapshotted when the connector has been initialized for the first time. Following the completion of that snapshot, the connector continues to record row-level inserts, updates, and deletions after bulk operations of database data.

If everything went according to plan up to this point, you ought to see a topic named salary.public.employee generated in Kafka topic.

Checking Kafka Container

To check Kafka topics and messages:

/usr/bin/kafka-topics --list  --bootstrap-server kafka:9092

/usr/bin/kafka-console-consumer  --bootstrap-server kafka:9092  --topic salary.public.employee --from-beginning

What do these Kafka messages make clear to us? As anticipated, the PostgreSQL operation semantics (for an insert, update, delete and bulk) correspond to the operation type (c, u, d and r). There is a “after” record for inserted, bulk and updated transactions that displays the values after the transaction was committed. There is a “before” record for deleted record that displays null for the “after” values. Also there is a “before” and “after” record for updated record that displays the values. A transaction id, ts_ms, database-specific sequence number, and lsn data are among the numerous meta-data that are present. If several events took place within the same transaction context, they can all share a single transaction id.

Checking ClickHouse Database

CREATE TABLE IF NOT EXISTS default.kafka_table
(
    `id` UInt64,
    `department` String,
    `employee_number` String,
    `date_of_recruitment` Date
)
ENGINE = ReplacingMergeTree
ORDER BY id;

As soon as the table is created, the data set will appear in ClickHouse database. Please make sure the Python container is running as soon as you create the table in ClickHouse.

Conclusion

Using the Debezium and ChistaDATA Python Conenctor services, we successfully built, set up, and operated a test CDC pipeline from PostgreSQL to a ClickHouse database in this blog.

To know more about streaming data in ClickHouse, do read the following articles

About Ilkay 24 Articles
Ilkay has been administering databases including NoSQL and RDBMS for over 7 years. He is experienced working high-scale banking databases. He is currently working at ChistaDATA Inc. as Database Administrator.
Contact: Website