clickhouse-copier – A reliable workhorse for copying data across ClickHouse servers

Introduction

ClickHouse comes with useful tools for performing various tasks. clickhouse-copier is one among them and as the name suggests, it is used for copying data from one ClickHouse server to another. The servers can be from the same cluster or different cluster altogether. This tool requires Apache Zookeeper or clickhouse-keeper to synchronise the copying process across the servers. If the ClickHouse cluster has an existing Zookeeper cluster or ClickHouse keeper for replicated tables, then we can make use of the existing keeper. We can copy the data from one server to another using INSERT INTO … SELECT .. , but this is not an optimal choice for transferring huge tables. The clickhouse-copier can be used to transfer huge amounts of data without any bottlenecks and reliably across the servers.

The clickhouse-copier is a command line tool and comes with ClickHouse. It is recommended to run the copier tool from the server containing the data, but we can run the copier from any server. In this article, I am going to use a docker based setup to illustrated the working of this tool. The docker-compose has the following services

  1. Apache Zookeeper
  2. Two standalone ClickHouse server
version: '3'

services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    networks:
      - ch_copier
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
      - ZOOKEEPER_CLIENT_PORT=2181
    ports:
      - "2182:2181"
      - "2888:2888"
      - "3888:3888"
    
  clickhouse1:
    image: clickhouse/clickhouse-server
    ports:
      - "8002:9000"
      - "9123:8123"
    ulimits:
      nproc: 65535
      nofile:
        soft: 262144
        hard: 262144
    networks:
      - ch_copier
    depends_on:
      - zookeeper
      
  clickhouse2:
    image: clickhouse/clickhouse-server
    ports:
      - "8003:9000"
      - "9124:8123"
    ulimits:
      nproc: 65535
      nofile:
        soft: 262144
        hard: 262144
    networks:
      - ch_copier
    depends_on:
      - zookeeper

networks:
  ch_copier:
    driver: bridge

Step 1

Connect to any of the ClickHouse server running in docker. Create a table based on the following SQL statements.

CREATE TABLE salary
(
    `salary` Nullable(UInt32),
    `month` Nullable(String),
    `name` String
)
ENGINE = MergeTree
ORDER BY name
SETTINGS index_granularity = 8192;

Once the table is created, insert some data based on generateRandom function in ClickHouse.

INSERT INTO salary SELECT * FROM generateRandom() LIMIT 10000000;

This will insert 10 million rows of data.

Step 2

Once the containers are up and running, get into the container shell with ClickHouse server with data to be copied. Create the zookeeper.xml file in the home directory with the following contents.

<clickhouse>
    <zookeeper>
        <node index="1">
            <host>zookeeper</host>
            <port>2181</port>
        </node>
    </zookeeper>
</clickhouse>

Next, create the task.xml file. This file will contain all the details related to the copying task. It will include the details of the source and destination cluster, the tables and databases, the destination table engine etc. I have a sample to copy a salary table from one ClickHouse server to another. This xml file is based on the example available here.

<clickhouse>
    <!-- Configuration of clusters as in an ordinary server config -->
    <remote_servers>
        <source_cluster>
            <!--
                source cluster & destination clusters accept exactly the same
                parameters as parameters for the usual Distributed table
                see https://clickhouse.com/docs/en/engines/table-engines/special/distributed/
            -->
            <shard>
                <internal_replication>false</internal_replication>
                    <replica>
                        <host>clickhouse1</host>
                        <port>9000</port>
                        <!--
                        <user>default</user>
                        <password>default</password>
                        <secure>1</secure>
                        -->
                    </replica>
            </shard>

        </source_cluster>

        <destination_cluster>
            <shard>
                <internal_replication>false</internal_replication>
                    <replica>
                        <host>clickhouse2</host>
                        <port>9000</port>
                        <!--
                        <user>default</user>
                        <password>default</password>
                        <secure>1</secure>
                        -->
                    </replica>
            </shard>
        </destination_cluster>
    </remote_servers>

    <!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
    <max_workers>2</max_workers>

    <!-- Setting used to fetch (pull) data from source cluster tables -->
    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>

    <!-- Setting used to insert (push) data to destination cluster tables -->
    <settings_push>
        <readonly>0</readonly>
    </settings_push>

    <!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
         They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
    <settings>
        <connect_timeout>3</connect_timeout>
        <!-- Sync insert is set forcibly, leave it here just in case. -->
        <insert_distributed_sync>1</insert_distributed_sync>
    </settings>

    <!-- Copying tasks description.
         You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
         sequentially.
    -->
    <tables>
        <!-- A table task, copies one table. -->
        <table_salary>
            <!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
            <cluster_pull>source_cluster</cluster_pull>
            <database_pull>default</database_pull>
            <table_pull>salary</table_pull>

            <!-- Destination cluster name and tables in which the data should be inserted -->
            <cluster_push>destination_cluster</cluster_push>
            <database_push>default</database_push>
            <table_push>salary_copied</table_push>

            <!-- Engine of destination tables.
                 If destination tables have not be created, workers create them using columns definition from source tables and engine
                 definition from here.

                 NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
                 be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
                 specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
                 system.parts table.
            -->
            <engine>
            ENGINE=MergeTree()
            ORDER BY (name)
            </engine>

            <!-- Sharding key used to insert data to destination cluster -->
            <sharding_key>1</sharding_key>


        </table_salary>

    </tables>
</clickhouse>

Given below are the important config sections

  • <remote_server> – Contains the details of the source and destination remote servers. Also includes the shard and node information.
  • <max_workers> – Maximum workers assigned to the copying task. There is no set formula to calculate the number of workers. This parameter roughly depends on the number of CPU cores and the size of the table.
  • <settings> – Common settings like timeout etc
  • <tables> – contains the source and destination table info.
    • <cluster_pull> and <cluster_push> – Contains the cluster in <remote_server> used to push and pull the data
    • <engine> – Table engine config for the destination table. If the table is not available, it will be created based on the columns in source table and the engine details present here
    • <sharding_key> – Expression to generate sharding key. Here, we use only one shard-one replica and hence hard-coded this as 1

Step 3

Run the copier tool from the directory containing the zookeeper.xml and task.xml

clickhouse-copier --config zookeeper.xml --task-path /copier/salary/1 --task-file task.xml

Available Parameters

  • daemon — Starts clickhouse-copier in daemon mode.
  • config — The path to the zookeeper.xml file with the parameters for the connection to Apache  Zookeeper.
  • task-path — The path to the Zookeeper node. This node is used for syncing clickhouse-copier processes and storing tasks.
  • task-file — File with task configuration
  • task-upload-force — Force upload task-file even if Zookeeper node exists.
  • base-dir — The path to logs and auxiliary files in the server where the copier is run (optional)

The copier may take a while to transfer the data. Once the copying is finished, you can connect to the ClickHouse server where the table is copied and verify

Conclusion

In this article we have become familiar with the clickhouse-copier tool which is a highly reliable workhorse when it comes to replicating / copying data from one ClickHouse server to another. We have also learnt specifically how to implement clickhouse-copier along with Apache Zookeeper.

To know more about Backup and Data Recovery, do consider reading the following articles:

References

https://clickhouse.com/docs/en/operations/utilities/clickhouse-copier

https://altinity.com/blog/2018/8/22/clickhouse-copier-in-practice

https://kb.altinity.com/altinity-kb-setup-and-maintenance/altinity-kb-data-migration/altinity-kb-clickhouse-copier/