Building a Custom ETL Tool: Technical Implementation for PostgreSQL to ClickHouse Data Movement

Building a Custom ETL Tool: Technical Implementation for PostgreSQL to ClickHouse Data Movement



Modern data architectures frequently require moving data between OLTP systems like PostgreSQL and OLAP systems like ClickHouse to support both transactional workloads and analytical processing. While commercial ETL solutions exist, building a custom ETL tool provides granular control over data processing, transformation logic, and performance optimization strategies tailored to specific organizational requirements.

This technical guide explores the engineering principles, implementation strategies, and architectural considerations for developing a robust custom ETL pipeline that efficiently moves data from PostgreSQL to ClickHouse.

Core ETL Architecture Components

Extract Layer: PostgreSQL Data Acquisition

SQL-Based Extraction Methods

The foundational approach involves executing SQL queries against PostgreSQL databases to retrieve data. This method supports various extraction patterns:

-- Full table extraction
SELECT * FROM source_table WHERE created_at >= '2024-01-01';

-- Incremental extraction using timestamps
SELECT * FROM source_table 
WHERE updated_at > :last_processed_timestamp;

-- Window-based extraction for large datasets
SELECT * FROM source_table 
WHERE id BETWEEN :start_id AND :end_id;

Change Data Capture (CDC) Implementation

For real-time data synchronization, CDC mechanisms capture database modifications as they occur:

Log-Based CDC: Monitors PostgreSQL’s Write-Ahead Log (WAL) to detect changes without impacting source system performance. This approach requires configuring logical replication slots:

-- Create replication slot
SELECT pg_create_logical_replication_slot('etl_slot', 'pgoutput');

-- Monitor changes
SELECT * FROM pg_logical_slot_get_changes('etl_slot', NULL, NULL);

rigger-Based CDC: Implements database triggers to capture INSERT, UPDATE, and DELETE operations into audit tables for subsequent processing.

Query-Based CDC: Polls source tables using timestamp or version columns to identify modified records since the last extraction cycle.

Connection Management and Performance Optimization

Implement connection pooling to manage database connections efficiently and prevent resource exhaustion. Use read replicas when available to minimize impact on production OLTP workloads.

import psycopg2.pool

# Connection pool configuration
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn=1,
    maxconn=20,
    host='postgres-replica',
    database='source_db',
    user='etl_user',
    password='password'
)

Transform Layer: Data Processing and Optimization

Data Type Mapping and Conversion

PostgreSQL and ClickHouse have different data type systems requiring careful mapping:

PostgreSQL TypeClickHouse TypeConversion Notes
TIMESTAMPDateTime64Handle timezone conversions
JSONBString/NestedParse JSON or flatten structure
ARRAYArray(T)Maintain array structure
UUIDUUID/StringChoose based on query patterns

Schema Transformation Strategies

Denormalization: ClickHouse performs optimally with denormalized data structures. Transform normalized PostgreSQL schemas into flattened representations:

def denormalize_order_data(order_row, customer_data, product_data):
    return {
        'order_id': order_row['id'],
        'order_date': order_row['created_at'],
        'customer_name': customer_data['name'],
        'customer_email': customer_data['email'],
        'product_name': product_data['name'],
        'product_category': product_data['category'],
        'order_amount': order_row['amount']
    }

Data Cleaning and Validation: Implement comprehensive data quality checks to ensure data integrity:

def validate_and_clean_data(row):
    # Handle null values
    if row.get('email') is None:
        row['email'] = 'unknown@domain.com'

    # Validate data ranges
    if row.get('amount', 0) < 0:
        raise ValueError(f"Invalid amount: {row['amount']}")

    # Standardize formats
    row['phone'] = re.sub(r'[^\d]', '', row.get('phone', ''))

    return row

Load Layer: ClickHouse Data Insertion

Batch Processing Optimization

ClickHouse performs optimally with large batch inserts rather than individual row operations 4. Implement batching logic to accumulate records before insertion:

def batch_insert_to_clickhouse(client, table_name, data_batch, batch_size=10000):
    for i in range(0, len(data_batch), batch_size):
        batch = data_batch[i:i + batch_size]

        # Prepare INSERT statement
        columns = ', '.join(batch[0].keys())
        values_placeholder = ', '.join(['%s'] * len(batch[0]))

        query = f"INSERT INTO {table_name} ({columns}) VALUES ({values_placeholder})"

        # Execute batch insert
        client.execute(query, [tuple(row.values()) for row in batch])

ClickHouse-Specific Loading Strategies

HTTP Interface Utilization: Leverage ClickHouse’s HTTP interface for flexible data loading:

import requests
import json

def load_via_http(data, table_name, clickhouse_url):
    headers = {'Content-Type': 'application/json'}

    # Format data for ClickHouse JSON input
    json_data = '\n'.join([json.dumps(row) for row in data])

    response = requests.post(
        f"{clickhouse_url}/?query=INSERT INTO {table_name} FORMAT JSONEachRow",
        data=json_data,
        headers=headers
    )

    if response.status_code != 200:
        raise Exception(f"Insert failed: {response.text}")

Table Engine Selection: Choose appropriate ClickHouse table engines based on data characteristics:

-- For time-series data
CREATE TABLE events (
    timestamp DateTime64,
    user_id UInt64,
    event_type String,
    properties String
) ENGINE = MergeTree()
ORDER BY (timestamp, user_id);

-- For aggregated data
CREATE TABLE daily_metrics (
    date Date,
    metric_name String,
    value Float64
) ENGINE = SummingMergeTree()
ORDER BY (date, metric_name);

Critical Engineering Considerations

Idempotency Implementation

Idempotent operations ensure consistent results regardless of execution frequency, crucial for handling failures and retries.

Primary Key-Based Upserts

Implement upsert logic using ClickHouse’s REPLACE or ON DUPLICATE KEY UPDATE functionality:

def idempotent_insert(client, table_name, data, primary_keys):
    # Delete existing records with matching primary keys
    if primary_keys:
        key_conditions = ' AND '.join([
            f"{key} IN ({','.join(str(row[key]) for row in data)})"
            for key in primary_keys
        ])

        delete_query = f"DELETE FROM {table_name} WHERE {key_conditions}"
        client.execute(delete_query)

    # Insert new data
    insert_data(client, table_name, data)

Consistent Batch Boundaries

Maintain consistent batch processing boundaries to enable safe retries:

def process_with_consistent_batches(data_source, batch_size, checkpoint_callback):
    batch_id = 0

    while True:
        batch = extract_batch(data_source, batch_id * batch_size, batch_size)

        if not batch:
            break

        try:
            process_batch(batch)
            checkpoint_callback(batch_id)
            batch_id += 1
        except Exception as e:
            # Retry same batch
            logger.error(f"Batch {batch_id} failed: {e}")
            raise

Error Handling and Resilience

Comprehensive Logging Strategy

Implement structured logging to track data flow and identify issues:

import logging
import json

def setup_etl_logging():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('etl_pipeline.log'),
            logging.StreamHandler()
        ]
    )

def log_batch_processing(batch_id, record_count, processing_time):
    logger.info(json.dumps({
        'event': 'batch_processed',
        'batch_id': batch_id,
        'record_count': record_count,
        'processing_time_seconds': processing_time,
        'timestamp': datetime.utcnow().isoformat()
    }))

Retry Mechanisms with Exponential Backoff

Implement intelligent retry logic for transient failures:

import time
import random

def retry_with_backoff(func, max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise e

            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
            time.sleep(delay)

Performance Monitoring and Observability

Metrics Collection and Monitoring

Implement comprehensive metrics collection to monitor pipeline health

from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Define metrics
records_processed = Counter('etl_records_processed_total', 'Total records processed')
processing_duration = Histogram('etl_batch_processing_seconds', 'Batch processing time')
pipeline_status = Gauge('etl_pipeline_status', 'Pipeline status (1=healthy, 0=failed)')

def monitor_batch_processing(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()

        try:
            result = func(*args, **kwargs)
            records_processed.inc(len(result))
            pipeline_status.set(1)
            return result
        except Exception as e:
            pipeline_status.set(0)
            raise e
        finally:
            processing_duration.observe(time.time() - start_time)

    return wrapper

Real-time Alerting Implementation

Set up alerting for critical pipeline failures:

import smtplib
from email.mime.text import MIMEText

def send_alert(subject, message, recipients):
    msg = MIMEText(message)
    msg['Subject'] = subject
    msg['From'] = 'etl-pipeline@company.com'
    msg['To'] = ', '.join(recipients)

    with smtplib.SMTP('smtp.company.com') as server:
        server.send_message(msg)

def check_pipeline_health():
    if pipeline_status._value.get() == 0:
        send_alert(
            'ETL Pipeline Failure',
            'The PostgreSQL to ClickHouse ETL pipeline has failed. Please investigate.',
            ['data-team@company.com']
        )

Scalability and Concurrency

Parallel Processing Implementation

Design the ETL tool to process data in parallel for improved throughput:

import concurrent.futures
import threading

class ParallelETLProcessor:
    def __init__(self, max_workers=4):
        self.max_workers = max_workers
        self.thread_local = threading.local()

    def get_db_connection(self):
        if not hasattr(self.thread_local, 'connection'):
            self.thread_local.connection = create_connection()
        return self.thread_local.connection

    def process_partition(self, partition_info):
        connection = self.get_db_connection()

        # Extract data for this partition
        data = extract_partition_data(connection, partition_info)

        # Transform data
        transformed_data = transform_data(data)

        # Load to ClickHouse
        load_to_clickhouse(transformed_data)

        return len(transformed_data)

    def run_parallel_processing(self, partitions):
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [executor.submit(self.process_partition, partition) 
                      for partition in partitions]

            total_records = sum(future.result() for future in concurrent.futures.as_completed(futures))

        return total_records

Advanced Implementation Patterns

Schema Evolution Handling

Implement flexible schema handling to accommodate changes in source or target systems:

class SchemaManager:
    def __init__(self):
        self.schema_cache = {}

    def detect_schema_changes(self, table_name, current_schema):
        cached_schema = self.schema_cache.get(table_name)

        if cached_schema != current_schema:
            logger.info(f"Schema change detected for {table_name}")
            self.handle_schema_evolution(table_name, cached_schema, current_schema)
            self.schema_cache[table_name] = current_schema

    def handle_schema_evolution(self, table_name, old_schema, new_schema):
        # Implement schema migration logic
        added_columns = set(new_schema.keys()) - set(old_schema.keys())
        removed_columns = set(old_schema.keys()) - set(new_schema.keys())

        if added_columns:
            self.add_columns_to_clickhouse(table_name, added_columns, new_schema)

        if removed_columns:
            logger.warning(f"Columns removed from source: {removed_columns}")

Data Quality Validation

Implement comprehensive data quality checks throughout the pipeline:

class DataQualityValidator:
    def __init__(self):
        self.validation_rules = {}

    def add_validation_rule(self, column, rule_func, error_message):
        if column not in self.validation_rules:
            self.validation_rules[column] = []
        self.validation_rules[column].append((rule_func, error_message))

    def validate_batch(self, data_batch):
        validation_errors = []

        for row_idx, row in enumerate(data_batch):
            for column, rules in self.validation_rules.items():
                if column in row:
                    for rule_func, error_message in rules:
                        if not rule_func(row[column]):
                            validation_errors.append({
                                'row': row_idx,
                                'column': column,
                                'value': row[column],
                                'error': error_message
                            })

        if validation_errors:
            raise DataQualityException(validation_errors)

        return True

# Usage example
validator = DataQualityValidator()
validator.add_validation_rule('email', lambda x: '@' in str(x), 'Invalid email format')
validator.add_validation_rule('amount', lambda x: float(x) >= 0, 'Amount must be non-negative')

Deployment and Operational Considerations

Configuration Management

Implement flexible configuration management for different environments:

import os
import yaml

class ETLConfig:
    def __init__(self, config_file=None):
        self.config = self.load_config(config_file)

    def load_config(self, config_file):
        if config_file:
            with open(config_file, 'r') as f:
                return yaml.safe_load(f)

        # Fallback to environment variables
        return {
            'postgresql': {
                'host': os.getenv('PG_HOST', 'localhost'),
                'database': os.getenv('PG_DATABASE'),
                'user': os.getenv('PG_USER'),
                'password': os.getenv('PG_PASSWORD')
            },
            'clickhouse': {
                'host': os.getenv('CH_HOST', 'localhost'),
                'database': os.getenv('CH_DATABASE'),
                'user': os.getenv('CH_USER', 'default')
            },
            'batch_size': int(os.getenv('BATCH_SIZE', '10000')),
            'max_workers': int(os.getenv('MAX_WORKERS', '4'))
        }

Scheduling and Orchestration

Integrate with scheduling systems for automated pipeline execution:

import schedule
import time
from datetime import datetime, timedelta

class ETLScheduler:
    def __init__(self, etl_processor):
        self.etl_processor = etl_processor
        self.last_run = None

    def run_incremental_etl(self):
        try:
            start_time = self.last_run or (datetime.now() - timedelta(hours=1))
            end_time = datetime.now()

            logger.info(f"Running incremental ETL from {start_time} to {end_time}")

            self.etl_processor.process_incremental(start_time, end_time)
            self.last_run = end_time

            logger.info("Incremental ETL completed successfully")

        except Exception as e:
            logger.error(f"ETL run failed: {e}")
            raise

# Schedule ETL runs
scheduler = ETLScheduler(etl_processor)
schedule.every(15).minutes.do(scheduler.run_incremental_etl)

# Run scheduler
while True:
    schedule.run_pending()
    time.sleep(60)

Conclusion

Building a custom ETL tool for PostgreSQL to ClickHouse data movement requires careful consideration of extraction methods, transformation strategies, loading optimization, and operational reliability. The technical implementation must balance performance, maintainability, and scalability while ensuring data integrity and system resilience.

Key success factors include implementing idempotent operations, comprehensive error handling and monitoring, efficient batch processing, and parallel executioncapabilities. The resulting custom solution provides granular control over data processing logic and can be optimized for specific organizational requirements and data patterns.

By following these technical implementation patterns and engineering best practices, organizations can build robust, maintainable ETL pipelines that efficiently bridge the gap between transactional PostgreSQL systems and analytical ClickHouse environments.

 

Further Reading

 

 

You might also like:

About ChistaDATA Inc. 170 Articles
We are an full-stack ClickHouse infrastructure operations Consulting, Support and Managed Services provider with core expertise in performance, scalability and data SRE. Based out of California, Our consulting and support engineering team operates out of San Francisco, Vancouver, London, Germany, Russia, Ukraine, Australia, Singapore and India to deliver 24*7 enterprise-class consultative support and managed services. We operate very closely with some of the largest and planet-scale internet properties like PayPal, Garmin, Honda cars IoT project, Viacom, National Geographic, Nike, Morgan Stanley, American Express Travel, VISA, Netflix, PRADA, Blue Dart, Carlsberg, Sony, Unilever etc