High-Performance Reads of Parquet Data Using ClickHouse Server Swarms

High-Performance Parquet File Reading with ClickHouse Swarms



Follow the architecture and optimization strategies detailed below to achieve high-performance reads of Parquet files using swarms of ClickHouse® servers. This approach leverages ClickHouse’s distributed and columnar nature, combined with the efficiency of Parquet file handling.


1. Architecture Overview

Key Components

  • Distributed ClickHouse Cluster:
    • A swarm of interconnected ClickHouse servers configured in a sharded and replicated architecture.
    • Ensures scalability, fault tolerance, and high availability.
  • Parquet File Storage:
    • Parquet files stored on object storage systems like Amazon S3, Google Cloud Storage, or HDFS.
    • Parquet’s columnar format enables efficient I/O and compression.
  • Data Ingestion & Query Orchestration:
    • Distributed table setup in ClickHouse to enable parallel reads.
    • External dictionaries or direct queries for metadata indexing.
  • Clients:
    • BI Tools, dashboards, and applications querying the data.

2. Steps for High-Performance Reads

Step 1: Configure Distributed ClickHouse Cluster

  • Sharding: Distribute data across multiple nodes using a consistent sharding key.
  • Replication: Enable replicas to ensure fault tolerance and load balancing for read-heavy workloads.
  • Topology Example:
    • N shards with M replicas each.
    • Shards handle data distribution, while replicas manage redundancy and load sharing.
CREATE TABLE parquet_data_distributed AS parquet_data
ENGINE = Distributed(cluster_name, database_name, parquet_data, sharding_key);

Step 2: Create External Storage Integration

ClickHouse can read Parquet files directly from external storage systems using table functions.

S3 Example:

SELECT * 
FROM s3('https://bucket-name.s3.amazonaws.com/file.parquet', 'AWS_ACCESS_KEY', 'AWS_SECRET_KEY', 'Parquet');

HDFS Example:

SELECT *
FROM hdfs('hdfs://namenode-host:8020/path/to/file.parquet', 'Parquet');

Local Parquet Files Example:

SELECT * FROM file('/path/to/file.parquet', 'Parquet');

Step 3: Optimize Parquet Reads

  • Column Pruning:
    • ClickHouse reads only the required columns, leveraging Parquet’s columnar structure.
    • Use SELECT queries with specific columns.
  • Partitioning:
    • Store Parquet files partitioned by common query dimensions (e.g., date, region).
    • This minimizes I/O during reads by allowing ClickHouse to skip irrelevant partitions.
  • Predicate Pushdown:
    • ClickHouse pushes down filters directly to the Parquet reader, reducing the amount of data read.
SELECT column1, column2 
FROM s3('https://bucket-name.s3.amazonaws.com/file.parquet', 'AWS_ACCESS_KEY', 'AWS_SECRET_KEY', 'Parquet')
WHERE column1 = 'value';

Step 4: Predefine Metadata with External Dictionaries

If Parquet files are large and frequently accessed, external dictionaries can be used to predefine file metadata, reducing runtime lookup overhead.

CREATE DICTIONARY parquet_metadata_dict
(
    file_path String,
    file_size UInt64,
    last_modified DateTime
)
PRIMARY KEY file_path
SOURCE(CLICKHOUSE(
    host '127.0.0.1'
    port 9000
    user 'default'
    table 'parquet_metadata'
))
LAYOUT(FLAT())

Step 5: Materialized Views for Pre-Aggregation

For frequently queried Parquet data, pre-aggregate results into a ClickHouse table using materialized views.

CREATE MATERIALIZED VIEW parquet_aggregates
ENGINE = MergeTree()
PARTITION BY toYYYYMM(date_column)
ORDER BY (dimension1, dimension2)
POPULATE
AS
SELECT dimension1, dimension2, SUM(metric_column) AS total_metric
FROM s3('https://bucket-name.s3.amazonaws.com/file.parquet', 'AWS_ACCESS_KEY', 'AWS_SECRET_KEY', 'Parquet')
GROUP BY dimension1, dimension2;

Step 6: Distributed Queries

Use the Distributed Engine to query data across multiple nodes in parallel.

SELECT *
FROM parquet_data_distributed
WHERE filter_condition;

3. Best Practices

Performance Optimizations

  • Enable Parallelism: Utilize ClickHouse’s ability to read Parquet files in parallel across shards and replicas.
  • Compression: Leverage Parquet’s compression for reduced storage and faster reads.
  • Merge Aggregations: Use pre-aggregated views or rollups to reduce computation during queries.

Capacity Planning

  • Storage: Use NVMe SSDs for local caching and fast intermediate storage.
  • Network: Ensure high-throughput network connections between ClickHouse nodes and storage (e.g., S3 or HDFS).
  • Compute: Scale ClickHouse nodes based on concurrent query load and complexity.

Observability

  • Monitor query performance using system.query_log and system.query_thread_log.
  • Track disk I/O, memory usage, and network latency to identify bottlenecks.

4. Example Use Case

Scenario: Large-Scale Financial Data Analysis

  • Data: Parquet files containing transaction logs, partitioned by transaction_date.
  • Query: Aggregation of total transaction value by region and time.

Steps:

  • Load Parquet data from S3:
SELECT region, SUM(transaction_value) 
FROM s3('https://bucket-name.s3.amazonaws.com/transactions.parquet', 'AWS_ACCESS_KEY', 'AWS_SECRET_KEY', 'Parquet')
GROUP BY region;
  • Pre-aggregate into a materialized view:
CREATE MATERIALIZED VIEW transaction_summary
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(transaction_date)
ORDER BY region
AS
SELECT region, SUM(transaction_value) AS total_value, COUNT() AS transaction_count
FROM s3('https://bucket-name.s3.amazonaws.com/transactions.parquet', 'AWS_ACCESS_KEY', 'AWS_SECRET_KEY', 'Parquet')
GROUP BY region;
  • Distribute the data for parallel query execution:
CREATE TABLE transaction_summary_distributed AS transaction_summary
ENGINE = Distributed(cluster_name, database_name, transaction_summary, region);
  • Query the distributed table:
SELECT region, total_value, transaction_count
FROM transaction_summary_distributed;

© 2024 ChistaDATA Inc. All rights reserved.

ClickHouse® is a registered trademark of ClickHouse, Inc. All other trademarks and registered trademarks are the property of their respective owners.

The information contained in this document is subject to change without notice. This document is provided “as is” and ChistaDATA Inc. makes no warranties, either express or implied, in this document.

https://chistadata.com/implementing-tiered-storage-in-clickhouse-leveraging-s3-for-efficient-data-archival-and-compliance/
https://chistadata.com/optimizing-data-processing-with-clickhouse-mergetree-on-s3/
https://chistadata.com/optimizing-clickhouse-server-performance-with-materialized-views/
https://chistadata.com/optimal-maintenance-plan-for-clickhouse-infrastructure-operations/
About Shiv Iyer 245 Articles
Open Source Database Systems Engineer with a deep understanding of Optimizer Internals, Performance Engineering, Scalability and Data SRE. Shiv currently is the Founder, Investor, Board Member and CEO of multiple Database Systems Infrastructure Operations companies in the Transaction Processing Computing and ColumnStores ecosystem. He is also a frequent speaker in open source software conferences globally.