Sharding in ClickHouse: Part 1

Introduction

Sharding is splitting a large table horizontally (row-wise) and storing it in multiple servers. Clickhouse uses distributed table engine for processing the sharded tables. Shards can be internally replicated or non-replicated in ClickHouse. Sharding allows storing huge amounts of data that may otherwise not fit in a single server. We shall look at basic sharding examples in this article using Docker.

Pros:

  • High availability
  • Faster query response time
  • Increased write bandwidth
  • Easy to scale out

Cons:

  • Added complexity
  • Possibility of unbalanced data in a shard

Sharding in ClickHouse

ClickHouse uses hash-based sharding, where a column is chosen as the sharding key from the table, and the values are hashed. Storing the data in appropriate shards is based on the hash value. For example, if there are 3 shards for a table, the remainder (modulo operation) of the hashed value, when divided by the number of shards, will determine the shard on which the particular row is stored (e.g 1234 % 3  = 1, so the data is stored on shard 2 and if the hash value is 12345 the data will be stored on shard 1 (12345 % 3 = 0))

ClickHouse Distributed Table Engine

Distributed table engines can perform parallel and distributed query processing in a ClickHouse cluster. This table engine can not store the data independently and depends on other table engines (MergeTree family) to store the underlying data. It is possible to insert the data directly into the distributed table (and ClickHouse determines the shards based on the shard key) or insert it into the underlying storage table in every cluster manually. It is possible to read the data directly by querying the distributed engine table.

Examples of Sharding in ClickHouse

1. Non-replicated and sharded table

We will see how to create a table with two shards on a two-node ClickHouse cluster. The docker-compose for this example is below.

version: '3'

services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    networks:
      - ch_replicated
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
      - ZOOKEEPER_CLIENT_PORT=2181
    ports:
      - "2182:2181"
      - "2888:2888"
      - "3888:3888"
    
  clickhouse1:
    image: clickhouse/clickhouse-server
    ports:
      - "8002:9000"
      - "9123:8123"
    ulimits:
      nproc: 65535
      nofile:
        soft: 262144
        hard: 262144
    networks:
      - ch_replicated
    depends_on:
      - zookeeper
      
  clickhouse2:
    image: clickhouse/clickhouse-server
    ports:
      - "8003:9000"
      - "9124:8123"
    ulimits:
      nproc: 65535
      nofile:
        soft: 262144
        hard: 262144
    networks:
      - ch_replicated
    depends_on:
      - zookeeper
      

networks:
  ch_replicated:
    driver: bridge

We are using a single-node Zookeeper cluster and two node ClickHouse cluster. Spin up the containers using docker-compose up command.

1.1 Configure the cluster

Once the servers are up and running, update the config.xml to make ClickHouse aware of the other nodes of the cluster. The hostname and the port details are based on the configuration in the docker-compose file. The configuration changes have to be made in every node of the cluster.

<remote_servers>
<replicated_cluster>
    <shard>
    <internal_replication>true</internal_replication>
    <replica>
        <host>clickhouse1</host>
        <port>9000</port>
    </replica>
    </shard>
    <shard>
    <replica>
        <host>clickhouse2</host>
        <port>9000</port>
    </replica>
    </shard>
</replicated_cluster>
</remote_servers>

1.2 Configure Zookeeper

The next step is to configure the zookeeper in config.xml file.

<zookeeper>
  <node index="1">
    <host>zookeeper</host>
    <port>2181</port>
  </node>  
</zookeeper>

1.3 Create the tables

Create the MergeTree table, which will act as storage for distributed table.

CREATE TABLE distributed_example
(
    ID UInt32,
    Name String
) ENGINE = MergeTree()
ORDER BY (ID);

Next step is to create a table with distributed table engine on top of this storage table.

CREATE TABLE distributed_example_engine
(
    ID UInt32,
    Name String
) 
ENGINE = Distributed('replicated_cluster', 'default', distributed_example, rand());

Distributed Table Engine parameters:

cluster – ‘replicated_cluster’ (configured in config.xml)

database – ‘default’

table – ‘distributed_example’ (Storage table)

sharding_key – rand() (Based on the value obtained from rand() function, the rows are stored in shards)

Important: Repeat this table creation in every node (Mergetree table and Distributed table creation)

1.4 Testing the sharding

We can test by inserting a few rows of data and see which shard it lands. Log in to the clickhouse-client in the first node and execute this statement.

INSERT INTO distributed_example_engine
VALUES (7, 'a'), (8, 'b'), (9, 'c');

Log in to the clickhouse-client in the second node and execute this statement.

INSERT INTO distributed_example_engine
VALUES (1, 'a'), (2, 'b'), (3, 'c');

Login back to the first node and run the following queries.

970c5f5ce5ce :) SELECT * FROM distributed_example_engine;
                

SELECT *
FROM distributed_example_engine

Query id: 5cb8a4c6-f114-4f9a-b6b6-152ba80c64ac

┌─ID─┬─Name─┐
│  2 │ b    │
│  3 │ c    │
│  8 │ b    │
│  9 │ c    │
└────┴──────┘
┌─ID─┬─Name─┐
│  1 │ a    │
│  7 │ a    │
└────┴──────┘

6 rows in set. Elapsed: 0.003 sec. 

970c5f5ce5ce :) 
970c5f5ce5ce :) SELECT * FROM distributed_example;
                

SELECT *
FROM distributed_example

Query id: 06540420-e481-457e-a7ef-3e886016b17b

┌─ID─┬─Name─┐
│  2 │ b    │
│  3 │ c    │
│  8 │ b    │
│  9 │ c    │
└────┴──────┘

4 rows in set. Elapsed: 0.002 sec. 

970c5f5ce5ce :) 

We can see that we have all the data from the clusters when we query the distributed table. But when we query the storage table, only a part of the data inserted is available in the node. Repeat the same in the second node to verify if the remaining data is located there.

2. Replicated and sharded table

We will work through an example of a table with two shards and two replicas each for every shard in a four-node cluster. The docker-compose for this example is below.

version: '3'

services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    networks:
      - ch_replicated
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
      - ZOOKEEPER_CLIENT_PORT=2181
    ports:
      - "2182:2181"
      - "2888:2888"
      - "3888:3888"
    
  clickhouse1:
    image: clickhouse/clickhouse-server
    ports:
      - "8002:9000"
      - "9123:8123"
    ulimits:
      nproc: 65535
      nofile:
        soft: 262144
        hard: 262144
    networks:
      - ch_replicated
    depends_on:
      - zookeeper
      
  clickhouse2:
    image: clickhouse/clickhouse-server
    ports:
      - "8003:9000"
      - "9124:8123"
    ulimits:
      nproc: 65535
      nofile:
        soft: 262144
        hard: 262144
    networks:
      - ch_replicated
    depends_on:
      - zookeeper
      
  clickhouse3:
    image: clickhouse/clickhouse-server
    ports:
      - "8004:9000"
      - "9125:8123"
    ulimits:
      nproc: 65535
      nofile:
        soft: 262144
        hard: 262144
    networks:
      - ch_replicated
    depends_on:
      - zookeeper
 
  clickhouse4:
    image: clickhouse/clickhouse-server
    ports:
      - "8005:9000"
      - "9126:8123"
    ulimits:
      nproc: 65535
      nofile:
        soft: 262144
        hard: 262144
    networks:
      - ch_replicated
    depends_on:
      - zookeeper
      

networks:
  ch_replicated:
    driver: bridge

We have four node ClickHouse cluster and a single-node Zookeeper cluster. Spin up the containers using docker-compose up command.

2.1 Configure the cluster

Update the config.xml to make ClickHouse aware of the other nodes of the cluster, once all the servers are up and running. The configuration changes has to be made in every node of the cluster.

<remote_servers>
<replicated_cluster>
    <shard>
    <internal_replication>true</internal_replication>
    <replica>
        <host>clickhouse1</host>
        <port>9000</port>
    </replica>
    <replica>
        <host>clickhouse2</host>
        <port>9000</port>
    </replica>
    </shard>
    <shard>
    <internal_replication>true</internal_replication>
    <replica>
        <host>clickhouse3</host>
        <port>9000</port>
    </replica>
    <replica>
        <host>clickhouse4</host>
        <port>9000</port>
    </replica>
    </shard>
</replicated_cluster>
</remote_servers>

Note that we have configured shard 1 and it’s replica in the node clickhouse1 and clickhouse2 and the other shard and it’s replica in the node clickhouse3 and clickhouse4.

2.2 Configure Zookeeper

The next step is to configure the zookeeper in config.xml file.

<zookeeper>
  <node index="1">
    <host>zookeeper</host>
    <port>2181</port>
  </node>  
</zookeeper>

2.3 Configuring macros

The configured macros are used in substitution while creating the replicated tables.

<!--In the node clickhouse1-->

<macros>
<shard>01</shard>
<replica>ch1</replica>
</macros>
<!--In the node clickhouse2-->

<macros>
<shard>01</shard>
<replica>ch2</replica>
</macros>
<!--In the node clickhouse3-->

<macros>
<shard>02</shard>
<replica>ch3</replica>
</macros>
<!--In the node clickhouse4-->

<macros>
<shard>02</shard>
<replica>ch4</replica>
</macros>

2.4 Create the tables

Let us create the tables required in all the nodes.

CREATE TABLE sharded_replicated_example
(
    ID UInt32,
    Name String
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/sharded_replicated/{shard}', '{replica}')
ORDER BY (ID);

The Zookeeper path in the parameter would be /clickhouse/tables/sharded_replicated/01 and replica would be ch1 for clickhouse1 node. It would be /clickhouse/tables/sharded_replicated/01 and replica would be ch2 for clickhouse1 node based on the configured macro.

The next step is to create the distributed table.

CREATE TABLE sharded_distributed_example
(
    ID UInt32,
    Name String
) 
ENGINE = Distributed('replicated_cluster', 'default', sharded_replicated_example, rand());

Important: Repeat this table creation in every node (ReplicatedMergeTree table and Distributed table creation)

2.5 Testing

Let us test this by inserting some data in the tables.

Node 1

INSERT INTO sharded_replicated_example
VALUES (1, 'a'), (2, 'b'), (3, 'c');

Node 3

INSERT INTO sharded_replicated_example
VALUES (4, 'a'), (5, 'b'), (6, 'c');

Query the data in the node 1.

8e51aa3e007d :) SELECT * FROM sharded_replicated_example;
                

SELECT *
FROM sharded_replicated_example

Query id: 6dc60671-0e94-4a6e-ba44-00d174ed18e5

┌─ID─┬─Name─┐
│  1 │ a    │
│  2 │ b    │
│  3 │ c    │
└────┴──────┘

3 rows in set. Elapsed: 0.037 sec. 

8e51aa3e007d :)
8e51aa3e007d :) SELECT * FROM sharded_distributed_example;

SELECT *
FROM sharded_distributed_example

Query id: dd0f7587-7220-4703-b908-b22508105ca2

┌─ID─┬─Name─┐
│  1 │ a    │
│  2 │ b    │
│  3 │ c    │
└────┴──────┘
┌─ID─┬─Name─┐
│  4 │ a    │
│  5 │ b    │
│  6 │ c    │
└────┴──────┘

6 rows in set. Elapsed: 0.043 sec. 

8e51aa3e007d :) 

Repeat the same in the other nodes and verify.

Conclusion

  • It is possible to share the clickhouse tables across the cluster
  • Distributed table engine is used for sharding
  • MergeTree family table engine is required to store the sharded data.
  • Data can be inserted directly into the distributed table engine, which in turn will decide the shard based on the sharding function
  • Distributed table engine can also be used to read the data.

To learn more about Horizontal Scaling and Sharding in ClickHouse, read the following articles:

References