Introduction
There are many ways to feed data into ClickHouse. One example is if you need to feed your database with log/message data on a regular basis. Before delving into complex messaging systems, consider using Vector, a simple but effective way to transfer log/message files from Kafka and other servers to ClickHouse. Another area of use is to feed your Nginx or Kubernetes logs. Today we will demonstrate the migration of Kafka messages into ClickHouse using with this tool.
Here are our sample Kafka messages:
{ "counter": { "value": 3.5 }, "agent": { "url": "chistadata.io", "tag": "DBaaS" }, "timestamp": "2024-03-07T00:30:02.816418753Z" }
Creating ClickHouse Table
Now let’s create a Clickhouse table to write message data to this table. The timestamp
column will come into ClickHouse database as a String datatype but we won’t transform our timestamp
column. We’ll use a workaround method and consider the time
column anymore.
CREATE TABLE default.vectors_table ( `counter` Float64, `url` String, `tag` String, `timestamp` String, `time` DateTime DEFAULT toDateTime(substring(timestamp, 1, 19)) ) ENGINE = MergeTree ORDER BY time SETTINGS index_granularity = 8192
Setup Vector
Vector is an open-source, high-performance data router made for contemporary data pipelines. Data ingestion, transformation, and routing between several sources and destinations are made simpler by it. Data migration from Kafka to ClickHouse becomes a simplified and effective operation when Vector’s capabilities are utilized. Custom file formats are easily managed with the Vector Remap Language, which can parse anything unstructured and map it to the specified structure. We can also try to write our own remap functions using the Vector Remap Language.
On Linux, we can run these commands to install Vector.
bash -c "$(curl -L https://setup.vector.dev)" sudo apt install vector
Configuring Vector Pipeline
Pipelining using Vector is fairly straightforward. We define ‘rules’ for how Vector collects, analyzes, and sends data to ClickHouse. Configuration is done in /etc/vector/vector.toml (OR we can use our own config file we have created) based on three basic steps:
[source.kafka_messages]
blocks define the sources from which data will be collected.
[transforms.process]
blocks define how the unstructured data is to be given structure.
[sinks.clickhouse]
blocks define the destinations where the structured data should be sent/stored.
There will be 3 main headers in our config file.
Collecting data with [source.kafka_messages]
As we have Kafka messages in this source header, we need to give Kafka access details. We need to tell Vector how to read this data:
[sources.kafka_messages] type = "kafka" auto_offset_reset = "beginning" bootstrap_servers = "kafka:9092" group_id = "test_1" topics = [ "test_topic" ] decoding.codec = "json"
Vector automatically retrieves new messages from topics as Kafka adds them in real time.
Structure data with [transforms.process]
To generate structured data, we’ll utilize some specifications with named capture groups implemented in VRL to process each item using a transform block. The transformation code is stored in the source parameter. This code will process the data and store it in new fields in our data object. These fields will then be available for sending to ClickHouse. However, since the data in Kafka is directly in JSON format, we will not perform this step.
Send data with [sinks.clickhouse]
Finally, we can set up data storage with Clickhouse. We add a new sink block for it like below:
[sinks.clickhouse] type = "clickhouse" inputs = ["kafka_messages"] endpoint = "ws171-ch370.stage.db.chistadata.io:9440" secure = "True" database = "default" table = "vectors_table" auth.user = "ilkay@chistadata.com" auth.password = "***"
Here, we ask Vector to take data from Kafka without any transformation and deliver it directly to ClickHouse’s vectors_table
. Our Vector configuration file that we ran:
[sources.kafka_messages] type = "kafka" auto_offset_reset = "beginning" bootstrap_servers = "kafka:9092" group_id = "test_1" topics = [ "test_topic" ] decoding.codec = "json" [sinks.clickhouse] type = "clickhouse" inputs = ["kafka_messages"] endpoint = "ws171-ch370.stage.db.chistadata.io:9440" secure = "True" database = "default" table = "vectors_table" auth.user = "ilkay@chistadata.com" auth.password = "***"
So we can execute the vector with our own config file. -v
means DEBUG mode in Vector, so we can inspect the logs better way.
/root/.vector/bin/vector --config-toml /root/.vector/config/kafka.toml -v
Let’s check if our data migration is real time or not:
SELECT counter, url, tag, time FROM default.vectors_table LIMIT 5 ┌─────────┬───────────────────────────┬──────────┬─────────────────────┐ │ 1 │ *******.com. │ kbo │ 2024-03-13 20:37:30 │ │ 1 │ *******.com. │ ods │ 2024-03-13 20:37:30 │ │ 3 │ *******.org. │ kds │ 2024-03-13 20:37:30 │ │ 3 │ *******.org. │ bvs │ 2024-03-13 20:37:30 │ │ 2 │ ******************.co.uk. │ dfx │ 2024-03-13 20:37:30 │ └─────────┴───────────────────────────┴──────────┴─────────────────────┘
Summary
Migrating data from Kafka to ClickHouse is an important step for organizations that want to apply ClickHouse’s analytics capabilities to their real-time data streams. Organizations can accelerate migration by deploying Vector as a high-performance data router, providing efficient and reliable data transport while maintaining data integrity and consistency. When paired with ClickHouse, Vector can be used for Kafka messages, Kubernetes logs and Nginx logs to ClickHouse in real-time.