Introduction
ClickHouse is known for its exceptional performance, scalability, and flexibility. With its ability to handle massive amounts of data and process queries in real-time, ClickHouse is becoming increasingly popular among data analysts, developers, and businesses looking to make sense of their data.
In this blog, we will explore one new game changer feature of ClickHouse which is released with 23.3 version of ClickHouse. It is called Parallel Replicas With Dynamic Shards.
Imagine that you want to build a ClickHouse cluster with 10 replicas. Normally, when you run a Select query, your database server send a request your primary server and bring the result to you.
With paralel replicas, when you run a Select query for example, Your request is automatically distributed among all replicas and data is read from all replicas so that your transaction result can reach you faster. It is like your data is reading from sharded cluster but it is actually just replicated cluster.
This was the summary of the feature. Lets look at the details and run real life example on EC2 instances.
Note: This is an experimental feature of ClickHouse. We will explain how you can enable experimental features in ClickHouse in the middle of the article.
First, lets explain Paralel Replicas.
Parallel Replicas
ClickHouse clusters implement sharding to address scenarios where the total data volume exceeds the capacity of a single host or processing speed is inadequate. Sharding involves partitioning data across multiple hosts as shards. By doing so, query performance can be enhanced since different subsets of data can be processed concurrently by various ClickHouse hosts. Typically, a distributed table is utilized to route a query to all shards.
There are situations where having an excessive number of shards is either not feasible or not required.
ClickHouse’s latest feature of parallel replicas allows users to leverage these replicas to parallelize data processing. The following diagram illustrates this concept:
The above diagram depicts that when the allow_experimental_parallel_reading_from_replicas setting is enabled, a query is transmitted to a random initiator replica node, which then distributes the query to all active replicas. Each replica processes a specific subset of data ranges from the entire shard data concurrently and transmits its local results back to the initiator node. The initiator node merges these local results to generate the final result.
Parallel Replicas with Dynamic Shards
Benefits of Parallel Replicas with Dynamic Shards:
Improved Query Performance: Parallel Replicas with Dynamic Shards allows users to parallelize data processing, which improves query performance.
Scalability: This feature enables dynamic shards, which means that users can scale data processing power without physically sharding the data. This makes it easier to handle large amounts of data.
Flexibility: Parallel Replicas with Dynamic Shards allows users to customize the feature by configuring the parallel_replicas_custom_key expression to split work based on their specific requirements.
How to Configure Parallel Replicas with Dynamic Shards:
To configure Parallel Replicas with Dynamic Shards, follow these steps:
Enable the allow_experimental_parallel_reading_from_replicas setting in the ClickHouse configuration file.
set allow_experimental_parallel_reading_from_replicas=1;
Configure the parallel_replicas_custom_key expression to split work based on your specific requirements.
Execute a query and verify that Parallel Replicas with Dynamic Shards is working as expected.
Example:
Let’s say you have a ClickHouse cluster with three replicas, and you want to split work based on the city column in your data. You can configure the parallel_replicas_custom_key from command line as follows:
set parallel_replicas_custom_key = product
Now, when you execute a query, the city column will be used to split work across the replicas. Each replica will process a specific subset of data ranges from the entire shard data concurrently and transmit its local results back to the initiator node. The initiator node will merge these local results to generate the final result.
For example, let’s say you have a table called cars that contains cars data. The table has the following schema:
CREATE TABLE cars ( date Date, name String, product String, car_amount Float32 ) ENGINE = MergeTree(date, (name, date), 8192);
To query the cars data for the name of Porsche, you can execute the following query:
SELECT date, product, sum(car_amount) AS total_sales FROM cars WHERE name = 'porsche' GROUP BY date, product
With Parallel Replicas with Dynamic Shards enabled and configured to split work based on the city column, the query will be split across the replicas based on the city column. Each replica will process a specific subset of data ranges for the city of New York and transmit its local results back to the initiator node. The initiator node will merge these local results to generate the final result.
This allows for faster query processing and improved performance, especially when dealing with large amounts of data.
Now let’s give a real life example on an EC2 instance.
Example
First, we will run a query on a standalone ClickHouse server. Then we scale ClickHouse host up to 3 replicas and run query again. And then we will scale up to 6 replicas. And we will compare the result.
Instances Resources: (We have 6 ClickHouse Server and 1 Zookeeper on EC2.)
Cpu: 8 Core
Memory: 64 GB
Storage: 2.5 TB
Dataset size: 1.8 TB
We run a query on stand-alone ClickHouse Server.
Without parallel replicas:
Query result:
SELECT count() FROM github_events WHERE body ILIKE '%ClickHouse%' ┌─count()─┐ │ 43504 │ └─────────┘ 1 row in set. Elapsed: 369.470 sec.
Elapsed: 369.470 sec.
When there is no parallel replication, it takes approx 6 minutes. (369.470 sec)
Next, we build 3 more ClickHouse instances on EC2 and replicate the table.
To allow parallel replication, you have to set allow_experimental_parallel_reading_from_replicas = 1 parameter. This is still experimental; before beginning, you have to enable it. Then you can scale max_parallel_replicas = 3
SET allow_experimental_parallel_reading_from_replicas = 1;
SET max_parallel_replicas = 3;
We run the same query on 3 parallel replicas ClickHouse server.
With three parallel replicas:
Query result:
SELECT count() FROM github_events WHERE body ILIKE '%ClickHouse%' ┌─count()─┐ │ 43504 │ └─────────┘ 1 row in set. Elapsed: 116.562 sec.
Elapsed: 121.562 sec. It takes 121.562 sec.
Next step, we build 3 more ClickHouse instances on EC2 and now we have 6 replicated ClickHouse server.
With 6 parallel replicas:
Scale up max_parallel_replicas to 6.
SET max_parallel_replicas = 6;
Query result:
SELECT count() FROM github_events WHERE body LIKE '%ClickHouse%' Query id: 73373392-073b-4921-b44b-f2e94d609d58 ┌─count()─┐ │ 43504 │ └─────────┘ 1 row in set. Elapsed: 58.809 sec.
Elapsed: 58.809 sec. It takes 58.809 sec.
Conclusion
In summary, parallel replicas and dynamic sharding in ClickHouse provide the means to achieve high-performance data processing, distributed query execution, and fault tolerance. Leveraging these features effectively can significantly enhance the scalability and efficiency of analytics workflows in ClickHouse’s distributed environment.
To read more about Sharding and Replication in ClickHouse, do consider reading the below articles
- ClickHouse Horizontal Scaling: Sharding and Resharding Strategies
- ClickHouse Horizontal Scaling: Introduction to Sharding in ClickHouse
- Setting up ClickHouse Cluster Replication with Zookeeper
References:
- Example dataset: https://ghe.clickhouse.tech
- ChistaDATA ClickHouse 23.3 release blog: https://chistadata.com/clickhouse-march-2023-release-version-23-3/
- ClickHouse 23.3 release slide: https://presentations.clickhouse.com/release_23.3/#cover
- Setup ClickHouse Cluster Replication with Zookeeper: https://chistadata.com/setup-clickhouse-cluster-replication-with-zookeeper/