Leveraging ClickHouse for Real-time Analytics on Blockchain Data

Blockchain Data Processing

 

Introduction

With one of our proof-of-concepts (POC), we have implemented ClickHouse to handle Ethereum blockchain transaction data for processing purposes. Through the cumulative aggregation and analysis of this data, we can extract valuable insights to develop machine learning algorithms, run AI models, and create comprehensive dashboards.

To overcome the limitations imposed by API call restrictions when performing analytics directly on Ethereum, we have devised a robust solution. By employing the Remote procedure call (RPC CALL) mechanism, we efficiently access and utilize Ethereum blockchain data. We have seamlessly integrated ClickHouse with Amazon S3 to facilitate this process. After retrieving the data from Ethereum, we store it in both JSON and CSV formats in S3. Leveraging the power of ClickHouse, we ingest and analyze this data.

ClickHouse serves as a high-performance analytical database, enabling us to process and query large volumes of blockchain data with remarkable efficiency. Through the integration of ClickHouse with S3, we gain direct access to the stored Ethereum data within ClickHouse’s environment, thereby empowering us with fast and efficient analytics capabilities. This integration unlocks ClickHouse’s advanced features, including real-time querying, advanced analytics, and machine learning algorithms, allowing us to extract valuable insights from the Ethereum blockchain data.

Table Size 

SELECT
    table,
    formatReadableSize(sum(data_compressed_bytes)) AS compressed_size,
    formatReadableSize(sum(data_uncompressed_bytes)) AS uncompressed_size,
    round(sum(data_uncompressed_bytes) / sum(data_compressed_bytes), 2) AS ratio
FROM system.columns
WHERE database = 'default' and table in ('blocks_data_all','receipts_json_local_untuple')
GROUP BY table;

SELECT
    table,
    formatReadableSize(sum(data_compressed_bytes)) AS compressed_size,
    formatReadableSize(sum(data_uncompressed_bytes)) AS uncompressed_size,
    round(sum(data_uncompressed_bytes) / sum(data_compressed_bytes), 2) AS ratio
FROM system.columns
WHERE (database = 'default') AND (table IN ('blocks_data_all', 'receipts_json_local_untuple'))
GROUP BY table

Query id: 5ac6cec0-ba66-4f4d-944e-c85e7a39111d

┌─table───────────────────────┬─compressed_size─┬─uncompressed_size─┬─ratio─┐
│ blocks_data_all             │ 389.59 GiB      │ 5.02 TiB          │ 13.21 │
│ receipts_json_local_untuple │ 100.32 GiB      │ 1.40 TiB          │ 14.32 │
└─────────────────────────────┴─────────────────┴───────────────────┴───────┘

2 rows in set. Elapsed: 0.002 sec.

Query with OOM Error

When working with large tables in ClickHouse, handling memory limitations can pose a significant challenge, particularly when executing complex joins on extensive datasets. As depicted in the screenshot below, we encountered a memory allocation error shortly after running this join query on both large Table (RIGHT AND LEFT TABLE IN JOIN)

ip-10-1-138-41.us-east-2.compute.internal :) select a.*,
    b.*
FROM blocks_data_all AS a
INNER JOIN receipts_json_local_untuple AS b ON a.blockHash = b.logs_blockHash and a.blockNumber = b.blockNumber and a.from = b.from and a.to=b.to and a.transactionIndex  =b.transactionIndex ;

SELECT
    a.*,
    b.*
FROM blocks_data_all AS a
INNER JOIN receipts_json_local_untuple AS b ON (a.blockHash = b.logs_blockHash) AND (a.blockNumber = b.blockNumber) AND (a.from = b.from) AND (a.to = b.to) AND (a.transactionIndex = b.transactionIndex)

Query id: 8890efc4-b59c-46ff-a93b-46f930d6abd0

Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 23.4.2 revision 54462.

↖ Progress: 12.73 million rows, 19.04 GB (2.48 million rows/s., 3.71 GB/s.) ▊                                                     (5.5 CPU, 34.53 GB RAM) 1%
0 rows in set. Elapsed: 5.131 sec. Processed 12.73 million rows, 19.04 GB (2.48 million rows/s., 3.71 GB/s.)

Received exception from server (version 23.4.2):
Code: 241. DB::Exception: Received from localhost:9000. DB::Exception: Memory limit (total) exceeded: would use 27.73 GiB (attempt to allocate chunk of 12330048 bytes), maximum: 27.52 GiB. OvercommitTracker decision: Query was selected to stop by OvercommitTracker.: (avg_value_size_hint = 523, avg_chars_size = 618, limit = 7130): (while reading column logsBloom): (while reading from part /s3_downloads/clickhouse/store/49e/49ec52a2-d1f4-4824-b685-f2beacec7069/all_10232_10409_3/ from mark 120 with max_rows_to_read = 7130): While executing MergeTreeThread. (MEMORY_LIMIT_EXCEEDED)

To address this issue, we implemented the join_algorithm = ‘grace_hash’ setting, which proved successful. This algorithm efficiently handles large data sets by breaking them into manageable chunks called buckets. Instead of loading the entire table into memory, ClickHouse loads a specific number of buckets for processing at a time, ensuring memory usage remains within acceptable limits.

SETTINGS join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 200, send_timeout = 30000000000, receive_timeout = 30000000000 ;
ip-10-1-138-41.us-east-2.compute.internal :) insert into blocks_receipts_merge select a.*, b.* FROM blocks_data_all AS a INNER JOIN receipts_json_local_untuple AS b 
ON a.blockHash = b.logs_blockHash and a.blockNumber = b.blockNumber and a.from = b.from and a.to=b.to and a.transactionIndex =b.transactionIndex ; 


INSERT INTO blocks_receipts_merge
SETTINGS join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 200, send_timeout = 30000000000, receive_timeout = 30000000000 
SELECT 
   a.*, 
   b.*
FROM blocks_data_all AS a 
INNER JOIN receipts_json_local_untuple AS b ON (a.blockHash = b.logs_blockHash) AND (a.blockNumber = b.blockNumber) AND (a.from = b.from) AND (a.to = b.to) AND (a.transactionIndex = b.transactionIndex)
SETTINGS join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 200, send_timeout = 30000000000, receive_timeout = 30000000000 ;


Query id: 5632efc4-3b9c-46ff-403b-e36930d6abd0 

Connecting to localhost:9000 as user default. 
Connected to ClickHouse server version 23.4.2 revision 54462. 

↖ Progress: 3.09 billion rows, 9.74 TB (290.65 thousand rows/s., 916.13 MB/s.) ▊▊▊▊▊▊▊▊▊  (0.0 CPU, 27.96 GB RAM) 99%

We have also attached a screenshot showcasing how ClickHouse internally creates and organizes these buckets for data processing. This approach allows us to overcome memory constraints and seamlessly execute complex join operations on significant amounts of data, ultimately enhancing the performance and reliability of our data processing workflows.

During our performance testing, we diligently processed various file types of different sizes. This was done to thoroughly assess the performance of each file type and determine the most suitable data storage type on S3. Subsequently, we compiled the following report, highlighting our findings from the performance testing phase after processing of JSON, CSV, CSV with GZIP, parquet, and parquet with different compressed formats.

Conclusion

Our tests show that the parquet format outperformed other file types in terms of efficiency and speed. This finding underscores the advantages of utilizing the parquet format for ClickHouse data processing over the network, specifically when accessing data from S3.

Overall, Addressing memory limitations in ClickHouse when dealing with large tables requires a comprehensive approach that encompasses query optimization, hardware configuration, data modeling, and fine-tuning of ClickHouse settings. By employing these strategies, it is possible to overcome memory constraints and perform complex efficient joins on extensive datasets without encountering memory allocation errors.

To read more real-time analytics in ClickHouse, do consider reading the below articles