In large-scale analytical systems, uneven data distribution is one of the most insidious performance killers. When certain nodes in a ClickHouse cluster receive disproportionately high volumes of reads or writes, those nodes become hot spots — bottlenecks that degrade query latency, increase resource contention, and can ultimately destabilize the entire cluster. For engineering teams managing petabyte-scale workloads, understanding how to detect and remediate these hot spots is not optional; it is foundational to operational excellence.
This guide provides a comprehensive, practitioner-oriented walkthrough of hot spot detection and remediation in ClickHouse clusters. We cover the root causes of hot spots, the metrics and tools used to identify them, and the architectural and configuration strategies used to eliminate them.
Understanding Hot Spots in ClickHouse
A hot spot occurs when data access or storage is unevenly distributed across the nodes of a distributed ClickHouse cluster. ClickHouse is designed as a horizontally scalable columnar database, but scalability guarantees only hold when data sharding and query routing are well-configured. In practice, several factors can cause specific shards or replicas to absorb far more load than their peers.
The most common causes of hot spots include poor sharding key selection, time-series data skew, high-cardinality dimension queries that map to a single shard, and unbalanced INSERT patterns driven by upstream data pipelines. When a single shard hosts the majority of a high-traffic partition — for example, all events for the current day — every query touching recent data funnels through that shard, overwhelming its CPU, memory, and I/O subsystems while other shards sit idle.
Understanding these root causes is critical because the appropriate remediation strategy depends directly on why the hot spot exists, not merely where it is observed.
Metrics That Reveal Hot Spots
Effective hot spot detection begins with systematic monitoring. ClickHouse exposes a rich set of system tables and Prometheus-compatible metrics that surface shard-level imbalances. Engineers should instrument dashboards to track the following key indicators across all nodes simultaneously.
CPU and Thread Pool Utilization
The system.metrics and system.asynchronous_metrics tables expose per-node CPU utilization, thread pool queue lengths, and background merge thread saturation. A node consistently running at 90%+ CPU while others average 30–40% is a strong hot spot signal. Pay particular attention to QueryThread and BackgroundPoolTask metrics, as sustained elevation in these values indicates that the node is processing far more concurrent work than the rest of the cluster.
Query Execution Times per Shard
The system.query_log table records per-query execution metadata, including the originating shard context in distributed queries. By aggregating query_duration_ms by shard over rolling time windows, you can identify whether specific shards are consistently serving slower queries. A healthy cluster shows roughly uniform median query durations across shards; deviations greater than 2x warrant investigation. At ChistaDATA, our managed ClickHouse monitoring infrastructure continuously aggregates these metrics to surface shard-level latency anomalies in real time.
Data Volume Distribution
Run the following query across your distributed cluster to measure part-level data distribution per shard host:
SELECT
hostName() AS host,
database,
table,
formatReadableSize(sum(bytes_on_disk)) AS total_size,
sum(rows) AS total_rows,
count() AS part_count
FROM clusterAllReplicas('your_cluster', system.parts)
WHERE active = 1
GROUP BY host, database, table
ORDER BY sum(bytes_on_disk) DESC;
Significant variance in total_size or total_rows across hosts for the same table is direct evidence of storage-level hot spotting. This query should be part of your regular cluster health audit, run at least daily on production systems.
Network I/O and Merge Pressure
Hot nodes often exhibit elevated network egress due to inter-shard data shuffle during distributed joins and GROUP BY operations. ClickHouse’s system.events table tracks NetworkSendBytes and NetworkReceiveBytes per node. Simultaneously, high MergesInProgress on one shard compared to others suggests that shard is absorbing a disproportionate number of INSERT operations, generating more parts that need merging.
Root Cause Analysis: Sharding Key Problems
The most frequent structural cause of hot spots is a poorly chosen sharding key in the Distributed table definition. When data is sharded by a low-cardinality column — such as a boolean flag, a status field, or a coarse date bucket — the hash function distributes rows unevenly by design. For example, sharding by is_premium_user when 98% of your users are non-premium guarantees that one shard will store nearly all data.
Similarly, sharding by toDate(event_timestamp) creates temporal hot spots: the shard assigned the current date’s hash bucket continuously receives all new INSERTs while historical shards receive none. This pattern is extremely common in event-driven architectures and is one of the primary anti-patterns documented in the ClickHouse Distributed engine documentation.
The correct approach is to select a sharding key with high cardinality and uniform value distribution — typically a user ID, session ID, or a hash of a composite key. The expression cityHash64(user_id) or rand() are commonly used defaults when no natural high-cardinality key exists.
Remediating Hot Spots: Strategies and Implementation
Once a hot spot is confirmed and its root cause is identified, remediation falls into one of several categories: sharding key redesign, data rebalancing, query routing optimization, or infrastructure scaling. Most production remediations combine elements from multiple categories.
Resharding with a Better Key
Resharding is the most impactful long-term fix but also the most operationally complex. In ClickHouse, changing the sharding key of a Distributed table requires inserting data into a new table with the corrected key definition. The typical approach involves creating a new distributed table with the improved sharding expression, migrating data using INSERT INTO new_table SELECT * FROM old_table in batches to avoid I/O storms, validating row counts and checksums across shards, and then atomically swapping table references in application queries via a view or alias.
For very large datasets, this migration should be paced using ClickHouse’s INSERT with SETTINGS max_insert_threads and max_block_size to throttle throughput and prevent the migration itself from creating a secondary hot spot on the destination shard.
Using Weighted Sharding
ClickHouse’s Distributed engine supports per-shard weights in the cluster configuration. If certain nodes have more disk, CPU, or memory capacity, assigning higher weights directs proportionally more data to those nodes. This is a quick operational lever when you have hardware heterogeneity in your cluster. Update your config.xml cluster definition to add <weight> tags to individual shards and reload the configuration without restarting ClickHouse.
Partitioning Strategies to Distribute Write Load
Beyond sharding, the PARTITION BY clause in MergeTree table definitions affects how data is organized on disk within each shard. A partition key that creates too-large partitions on one shard (again, often date-based) can cause localized I/O hot spots. Consider sub-partitioning strategies that divide data both by time and by a secondary dimension such as region or tenant ID. This spreads merge operations and improves parallelism of part processing within the hot node even before resharding is feasible.
Query Routing and Load Balancing
On the query side, ClickHouse’s load_balancing setting controls how the initiating node selects replicas to route subqueries during distributed query execution. The default random policy distributes queries randomly across replicas. Switching to nearest_hostname or in_order can reduce cross-rack network traffic. More powerfully, using prefer_localhost_replica=1 ensures that queries initiated on a node preferentially read from local replica data, dramatically reducing network amplification on hot nodes that happen to be both a coordinator and a data host.
For read-heavy hot spots on replicated tables, consider adding additional replicas specifically for the hot shard and configuring external load balancers — such as chproxy — to distribute incoming SELECT queries across all replicas of that shard, including the new ones. This is often the fastest remediation path when resharding is not immediately feasible.
Throttling Upstream Insert Pipelines
Hot spots driven by write imbalance often originate upstream, in the data pipeline itself. Kafka consumers, Flink jobs, or batch ETL processes that partition their output by the same low-cardinality key that caused the sharding imbalance will perpetuate the problem even after resharding. Audit your upstream pipeline partitioning logic and ensure it uses the same high-cardinality key as your ClickHouse sharding expression. Using ClickHouse’s native Kafka table engine or Kafka consumer groups with random partition assignment can help distribute write pressure more uniformly.
Advanced Detection: Automated Hot Spot Alerting
Manual query-based hot spot detection is reactive. Production environments require automated alerting that fires before hot spots cause customer-facing impact. A robust alerting framework for ClickHouse clusters should include the following components.
First, deploy a Prometheus exporter — either the official ClickHouse Exporter or the community-maintained clickhouse-exporter — and configure Grafana dashboards with per-host metric panels. Alert on the coefficient of variation (CV) of CPU usage across nodes: a CV exceeding 0.4 over a 10-minute window is a reliable hot spot indicator. Second, schedule a periodic SQL job that runs the data volume distribution query above and writes results to a monitoring table; alert when any single host accounts for more than 40% of total cluster storage for a given table. Third, track the ratio of MergesInProgress across nodes and alert when the maximum-to-minimum ratio exceeds 3x for more than five minutes.
These thresholds are starting points. Calibrate them against your workload’s baseline variability to reduce alert fatigue without masking genuine hot spots. Our team at ChistaDATA’s managed ClickHouse service routinely configures these alerting pipelines as part of cluster onboarding to give clients immediate visibility into distribution health.
Hot Spot Remediation in Practice: A Case Study Pattern
Consider a common scenario: an analytics platform ingesting clickstream data sharded by toYYYYMM(event_date). After six months, the current-month shard is receiving 100% of all new writes and approximately 80% of all reads (since most dashboards query recent data). CPU on that shard runs at 95% during peak hours; other shards average 15%.
The remediation sequence in this scenario would proceed as follows. First, add two additional replicas to the hot shard immediately to distribute read load while the structural fix is planned. Configure chproxy to round-robin SELECT queries across all three replicas of the current-month shard. Second, design a new sharding key: cityHash64(user_id) % number_of_shards. Create new distributed and underlying ReplicatedMergeTree tables with this key. Third, migrate historical data in monthly batches, starting with the oldest months to minimize I/O impact on current operations. Fourth, update the upstream Kafka consumer to use the new sharding key for routing. Finally, once migration is validated, drop the old tables and remove the temporary additional replicas.
This phased approach ensures that remediation itself does not create downtime or secondary hot spots, a common pitfall when resharding is performed without careful pacing.
ClickHouse-Specific Tooling for Ongoing Hot Spot Management
Beyond ad hoc queries and Prometheus metrics, several ClickHouse-native features support ongoing hot spot prevention. The system.distribution_queue table exposes the state of asynchronous INSERT queues in Distributed tables, allowing you to detect when one shard’s queue is growing faster than others — an early warning sign of write hot spotting before it manifests as CPU saturation.
The system.query_log table, when analyzed with window functions, can surface queries with abnormally high read_rows or read_bytes relative to the cluster median — these queries are often the drivers of read hot spots and may benefit from query rewriting, materialized view pre-aggregation, or result caching.
Finally, ClickHouse’s built-in OPTIMIZE TABLE ... FINAL command, when applied judiciously to over-partitioned hot shards, can consolidate excessive part counts, reduce background merge pressure, and recover I/O throughput. This should be used carefully in production, as it forces immediate merging and temporarily increases I/O, but it can be a valuable emergency lever when a hot shard’s part count is spiraling.
Conclusion
Hot spots in ClickHouse clusters are not a sign of a broken system — they are a sign that the system has grown beyond the assumptions baked into its initial configuration. Every high-scale ClickHouse deployment will eventually encounter some form of data distribution imbalance. The difference between teams that manage this gracefully and those that face outages lies in systematic monitoring, a clear causal understanding of why hot spots form, and a practiced set of remediation strategies ready to deploy.
By combining shard-level metric monitoring, automated alerting on distribution coefficients, judicious use of weighted sharding and replica scaling, and careful resharding migrations guided by high-cardinality key selection, engineering teams can maintain a balanced, performant ClickHouse cluster at any scale. For organizations seeking expert guidance on ClickHouse cluster architecture, hot spot prevention, and managed operations, ChistaDATA’s engineering team is available to help design and operate production-grade ClickHouse infrastructure.