• Product
  • Pricing
  • Docs
  • Using PostHog
  • Community
  • Company
  • Login
  • Table of contents

  • Handbook
    • Start here
    • Meetings
    • Story
    • Team
    • Investors
    • Strategy overview
    • Business model
    • Objectives
    • Roadmap
    • Brand
    • Culture
    • Values
    • Small teams
    • Goal setting
    • Diversity and inclusion
    • Communication
    • Management
    • Offsites
    • Security
    • Brand assets
    • Team structure
    • Customer Success
    • Exec
    • Experimentation
    • Growth
    • Infrastructure
    • Marketing
    • People & Ops
    • Pipeline
    • Product Analytics
    • Session Recording
    • Website & Docs
    • Compensation
    • Share options
    • Benefits
    • Time off
    • Spending money
    • Progression
    • Training
    • Side gigs
    • Feedback
    • Onboarding
    • Offboarding
      • Product Manager ramp up
    • Merch store
      • Overview
      • How to interview
      • Engineering hiring
      • Marketing hiring
      • Operations hiring
      • Design hiring
      • Exec hiring
      • Developing locally
      • Tech stack
      • Project structure
      • How we review PRs
      • Frontend coding
      • Backend coding
      • Support hero
      • Feature ownership
      • Working with product design
      • Releasing a new version
      • Handling incidents
      • Bug prioritization
      • Event ingestion explained
      • Making schema changes safely
      • How to optimize queries
      • How to write an async migration
      • How to run migrations on PostHog Cloud
      • Working with ClickHouse materialized columns
      • Deployments support
      • Working with cloud providers
      • How-to access PostHog Cloud infra
      • Developing the website
      • MDX setup
      • Markdown
      • Jobs
      • Overview
      • Data storage or what is a MergeTree
      • Data replication
      • Data ingestion
      • Working with JSON
      • Query performance
      • Operations
        • Overview
        • sharded_events
        • app_metrics
        • person_distinct_id
    • Shipping things, step by step
    • Feature flags specification
    • Setting up SSL locally
    • Tech talks
    • Overview
    • Product metrics
    • User feedback
    • Paid features
    • Releasing as beta
    • Our philosophy
    • Product design process
    • Designing posthog.com
    • Overview
    • Personas
    • Testimonials
    • Value propositions
      • Content & SEO
      • Sponsorship
      • Paid ads
      • Email
      • Press
    • Growth strategy
    • Customer support
    • Inbound sales model
    • Sales operations
      • Managing our CRM
      • YC onboarding
      • Demos
      • Billing
      • Who we do business with
    • Growth reviews
  • Table of contents

  • Handbook
    • Start here
    • Meetings
    • Story
    • Team
    • Investors
    • Strategy overview
    • Business model
    • Objectives
    • Roadmap
    • Brand
    • Culture
    • Values
    • Small teams
    • Goal setting
    • Diversity and inclusion
    • Communication
    • Management
    • Offsites
    • Security
    • Brand assets
    • Team structure
    • Customer Success
    • Exec
    • Experimentation
    • Growth
    • Infrastructure
    • Marketing
    • People & Ops
    • Pipeline
    • Product Analytics
    • Session Recording
    • Website & Docs
    • Compensation
    • Share options
    • Benefits
    • Time off
    • Spending money
    • Progression
    • Training
    • Side gigs
    • Feedback
    • Onboarding
    • Offboarding
      • Product Manager ramp up
    • Merch store
      • Overview
      • How to interview
      • Engineering hiring
      • Marketing hiring
      • Operations hiring
      • Design hiring
      • Exec hiring
      • Developing locally
      • Tech stack
      • Project structure
      • How we review PRs
      • Frontend coding
      • Backend coding
      • Support hero
      • Feature ownership
      • Working with product design
      • Releasing a new version
      • Handling incidents
      • Bug prioritization
      • Event ingestion explained
      • Making schema changes safely
      • How to optimize queries
      • How to write an async migration
      • How to run migrations on PostHog Cloud
      • Working with ClickHouse materialized columns
      • Deployments support
      • Working with cloud providers
      • How-to access PostHog Cloud infra
      • Developing the website
      • MDX setup
      • Markdown
      • Jobs
      • Overview
      • Data storage or what is a MergeTree
      • Data replication
      • Data ingestion
      • Working with JSON
      • Query performance
      • Operations
        • Overview
        • sharded_events
        • app_metrics
        • person_distinct_id
    • Shipping things, step by step
    • Feature flags specification
    • Setting up SSL locally
    • Tech talks
    • Overview
    • Product metrics
    • User feedback
    • Paid features
    • Releasing as beta
    • Our philosophy
    • Product design process
    • Designing posthog.com
    • Overview
    • Personas
    • Testimonials
    • Value propositions
      • Content & SEO
      • Sponsorship
      • Paid ads
      • Email
      • Press
    • Growth strategy
    • Customer support
    • Inbound sales model
    • Sales operations
      • Managing our CRM
      • YC onboarding
      • Demos
      • Billing
      • Who we do business with
    • Growth reviews
  • Handbook
  • Engineering
  • ClickHouse manual
  • Data replication

Data replication and distributed queries

Last updated: Nov 14, 2022

On this page

  • Setting up replicated tables
  • Sharding replicated tables
  • Monitoring replication
  • Distributed table engine
  • How writes against Distributed tables work
  • How queries against Distributed tables work
  • Example query - distributed sums
  • Example query: LIMIT, filter and aggregate
  • Improving this query
  • Query settings
  • Ad-hoc distributed queries
  • Further reading

This document provides information on:

  • How data replication and Distributed table engine works in ClickHouse
  • Sharding MergeTree tables
  • How to monitor replication
  • How to reason about distributed query execution
  • Important settings for distributed query execution
  • Doing ad-hoc distributed queries

Setting up replicated tables

A great guide on setting up replicated tables on a pre-existing cluster can be found in ClickHouse documentation.

Some important highlights are:

  • ClickHouse replication works on a table-by-table level, tables need to be created on all shards (preferably via using ON CLUSTER)
  • Replication requires a running ZooKeeper setup. In the future, this might be replaced by clickhouse-keeper
IMPORTANT GOTCHA:

Always use unique ZooKeeper paths for table definitions as re-use can and will lead to data loss. This applies even if the previous table has been dropped.

Sharding replicated tables

Sharding helps scale a dataset by having each node only store part of the data.

To decide whether to shard a table, consider how it's queried and what data it stores:

  • Shard: tables that could become too large for a single server (e.g. events, logs, raw analytics data)
  • Don't shard: table often JOINed in queries (e.g. persons, groups, cohorts) where the whole dataset is needed.

Sharding also requires care given in the schema - queries touching data should ideally only need to load data from a given shard.

When creating a replicated table, configuring whether a table is sharded or not is done via varying the parameters to a ReplicatedMergeTree engine:

  • Example sharded engine: ReplicatedMergeTree('/zk/some/path/{shard}/tablename', '{replica}')
  • Example not sharded table engine: ReplicatedMergeTree('/zk/some/path/tablename', '{replica}-{shard}')

Note that resharding large tables is currently relatively painful and bespoke operation - be careful choosing a good sharding key.

Monitoring replication

When doing larger cluster operations, it's often important to keep an eye on replication. The system.replication_queue and system.replicated_fetches tables can provide at-a-glance overview of what the system is doing.

Distributed table engine

Distributed table engine tables are used to query and write to sharded tables. Note that Distributed engine tables do not store any data on its own but rather always fan out to ReplicatedMergeTree tables on the cluster.

How writes against Distributed tables work

When INSERTing data against Distributed tables, ClickHouse decides which shard each row belongs to and forwards data to relevant shard(s) based on the sharding_key.

Note that if your underlying table has columns that ClickHouse populates (e.g. ALIAS, MATERIALIZED), it's often necessary to set up two Distributed tables:

  • One for writes containing a minimum set of columns
  • Another for reads which contain all columns

How queries against Distributed tables work

When querying Distributed table, you can send the query to any node in the ClickHouse cluster. That node becomes the coordinator, which:

  1. Figures out what queries individual shards need to execute and queues these queries
  2. Once results are in, aggregates the results together and returns an answer

Given local execution is faster than reading data over the network, ClickHouse will usually perform one of the queries locally instead of sending it to another replica of its shard.

Depending on the query, sub-queries executed on other shards might either return already aggregated data or stream entire datasets across the network. Being aware of which is done is crucial for performance.

Example query - distributed sums

Consider the following tables:

SQL
CREATE TABLE sharded_sensor_values ON CLUSTER 'my_cluster' (
timestamp DateTime,
site_id UInt32,
event VARCHAR,
uuid UUID,
metric_value Int32
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/sharded_sensor_values', '{replica}')
ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)
SETTINGS index_granularity = 8192
CREATE TABLE distributed_sensor_values ON CLUSTER 'my_cluster' (
timestamp DateTime,
site_id UInt32,
event VARCHAR,
uuid UUID,
metric_value Int32
)
ENGINE = Distributed('my_cluster', 'default', 'sharded_sensor_values', intHash64(site_id))

Writes and queries should be made against table distributed_sensor_values in this schema. It then distributes the data according to site_id.

See query to populate data
SQL
INSERT INTO distributed_sensor_values
SELECT *
FROM generateRandom('timestamp DateTime, site_id UInt8, event VARCHAR, uuid UUID, metric_value Int32', NULL, 10)
LIMIT 100000000

Consider this simple aggregation query executed against clickhouse01:

SQL
SELECT hostName(), sum(metric_value) FROM distributed_sensor_values GROUP BY hostName()
-- Results:
-- ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓
-- ┃ hostname() ┃ sum(metric_value) ┃
-- ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩
-- │ clickhouse01 │ -9035836479117 │
-- ├──────────────┼───────────────────┤
-- │ clickhouse03 │ 10003905228283 │
-- └──────────────┴───────────────────┘

hostname is a clickhouse helper function which returns the hostname query is executed on.

In this case clickhouse01 was the coordinator node. It:

  • sent out a subset of the query to clickhouse03 on other shard to execute. The query was SELECT hostname(), sum(`metric_value`) FROM `default`.`sharded_sensor_values` GROUP BY hostname()
  • ran the query locally, getting aggregated results
  • combined both the local and remote results

In this case, minimal network traffic happened since the results of a query could be combined independently.

Click to see full `EXPLAIN` plan``` Expression ((Projection + Before ORDER BY)) Header: hostname() String sum(metric_value) Int64 MergingAggregated Header: hostname() String sum(metric_value) Int64 SettingQuotaAndLimits (Set limits and quota after reading from storage) Header: hostname() String sum(metric_value) AggregateFunction(sum, Int32) Union Header: hostname() String sum(metric_value) AggregateFunction(sum, Int32) Aggregating Header: hostname() String sum(metric_value) AggregateFunction(sum, Int32) Expression (Before GROUP BY) Header: metric_value Int32 hostname() String SettingQuotaAndLimits (Set limits and quota after reading from storage) Header: metric_value Int32 ReadFromMergeTree Header: metric_value Int32 Indexes: PrimaryKey Condition: true Parts: 6/6 Granules: 5723/5723 ReadFromRemote (Read from remote replica) Header: hostname() String sum(metric_value) AggregateFunction(sum, Int32) ```

Example query: LIMIT, filter and aggregate

Consider this query:

SQL
SELECT
site_id,
uniq(event)
FROM distributed_sensor_values
WHERE timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY site_id
ORDER BY uniq(event) DESC
LIMIT 20

In this case, the query sent to other shards cannot do all the work on its own. Instead, the query being sent to the other shard would look something like the following:

SQL
SELECT
site_id,
uniqState(event)
FROM sharded_sensor_values
WHERE timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY site_id

In EXPLAIN output, this would be expressed as:

ReadFromRemote (Read from remote replica)
Header: site_id UInt32
uniq(event) AggregateFunction(uniq, String)

In this case coordinator needs to receive a lot of data from the other shards to calculate the correct results:

  1. It loads data for every site_id on the other shards
  2. It cannot just load the unique event count from the other shards, but rather needs to know what events were seen or not

This query is expensive in terms of the amount of data that needs to be transferred over the network.

One thing that makes this query more efficient is uniqState, which is a aggregate function combinator. It's useful since rather needing to send over all the events, the coordinator can send back an optimized bitmap-like structure that the coordinator can combine with its own results.

Click to see full `EXPLAIN` plan
Expression (Projection)
Header: site_id UInt32
uniq(event) UInt64
Limit (preliminary LIMIT (without OFFSET))
Header: site_id UInt32
uniq(event) UInt64
Sorting (Sorting for ORDER BY)
Header: site_id UInt32
uniq(event) UInt64
Expression (Before ORDER BY)
Header: site_id UInt32
uniq(event) UInt64
MergingAggregated
Header: site_id UInt32
uniq(event) UInt64
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: site_id UInt32
uniq(event) AggregateFunction(uniq, String)
Union
Header: site_id UInt32
uniq(event) AggregateFunction(uniq, String)
Aggregating
Header: site_id UInt32
uniq(event) AggregateFunction(uniq, String)
Expression (Before GROUP BY)
Header: site_id UInt32
event String
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: site_id UInt32
event String
ReadFromMergeTree
Header: site_id UInt32
event String
Indexes:
PrimaryKey
Keys:
toStartOfDay(timestamp)
Condition: and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))
Parts: 6/6
Granules: 1628/5723
ReadFromRemote (Read from remote replica)
Header: site_id UInt32
uniq(event) AggregateFunction(uniq, String)

Improving this query

This query can be made faster by setting the distributed_group_by_no_merge setting, like so:

SQL
SELECT
site_id,
uniq(event)
FROM distributed_sensor_values
WHERE timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY site_id
ORDER BY uniq(event) DESC
SETTINGS distributed_group_by_no_merge=1
LIMIT 20

After this, the coordinator knows to trust that the data is sharded according to site_id and it can send the same query down to other shards.

In EXPLAIN, this is represented by the ReadFromRemote being done later in the cycle and now reading UInt64 instead of AggregateFunction(uniq, String):

ReadFromRemote (Read from remote replica)
Header: site_id UInt32
uniq(event) UInt64

Takeaway: Proper data layout and usage of query settings can improve queries significantly by doing less work over the network.

Click to see full `EXPLAIN` plan
Header: site_id UInt32
uniq(event) UInt64
Union
Header: site_id UInt32
uniq(event) UInt64
Expression (Projection)
Header: site_id UInt32
uniq(event) UInt64
Limit (preliminary LIMIT (without OFFSET))
Header: site_id UInt32
uniq(event) UInt64
Sorting (Sorting for ORDER BY)
Header: site_id UInt32
uniq(event) UInt64
Expression (Before ORDER BY)
Header: site_id UInt32
uniq(event) UInt64
Aggregating
Header: site_id UInt32
uniq(event) UInt64
Expression (Before GROUP BY)
Header: site_id UInt32
event String
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: site_id UInt32
event String
ReadFromMergeTree
Header: site_id UInt32
event String
Indexes:
PrimaryKey
Keys:
toStartOfDay(timestamp)
Condition: and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))
Parts: 6/6
Granules: 1628/5723
ReadFromRemote (Read from remote replica)
Header: site_id UInt32
uniq(event) UInt64

Query settings

Some noteworthy query settings which affect the behavior of distributed queries are:

  • distributed_group_by_no_merge
  • distributed_push_down_limit
  • optimize_distributed_group_by_sharding_key
  • prefer_localhost_replica

Many of these unlock potential optimizations by streaming less data over the network, but require data to be sharded correctly to work.

Ad-hoc distributed queries

It's sometimes useful to query data from across the cluster without setting up Distributed tables, for example to query system tables on all nodes or shards.

This can be done as such:

SQL
SELECT hostName(), shardNum(), *
FROM clusterAllReplicas('my_cluster', 'system', 'metrics')

More documentation on this can be found at:

  • cluster, clusterAllReplicas ClickHouse docs
  • Other Functions ClickHouse docs

Further reading

  • Data Replication ClickHouse docs
  • Strength in Numbers: Introduction to ClickHouse Cluster Performance
  • Engines
  • ZooKeeper schema

Next in the ClickHouse manual: Data ingestion

Questions?

Was this page useful?

Next article

Data ingestion

This document covers: Different options for ingesting data into MergeTree tables and trade-offs involved How Kafka table engine works? What are materialized views? Examples of a full schema setup Using INSERT s for ingestion As any database system, ClickHouse allows using INSERT s to load data. Each INSERT creates a new part in ClickHouse, which comes with a lot of overhead and, in a busy system, will lead to errors due to exceeding parts_to_throw MergeTree table setting (default 30…

Read next article

Authors

  • Karl-Aksel Puulmann
    Karl-Aksel Puulmann
  • Ian Vanagas
    Ian Vanagas

Share

Jump to:

  • Setting up replicated tables
  • Sharding replicated tables
  • Monitoring replication
  • Distributed table engine
  • How writes against Distributed tables work
  • How queries against Distributed tables work
  • Example query - distributed sums
  • Example query: LIMIT, filter and aggregate
  • Query settings
  • Ad-hoc distributed queries
  • Further reading
  • Questions?
  • Edit this page
  • Raise an issue
  • Toggle content width
  • Toggle dark mode
  • Product

  • Overview
  • Pricing
  • Product analytics
  • Session recording
  • A/B testing
  • Feature flags
  • Apps
  • Customer stories
  • PostHog vs...
  • Docs

  • Quickstart guide
  • Self-hosting
  • Installing PostHog
  • Building an app
  • API
  • Webhooks
  • How PostHog works
  • Data privacy
  • Using PostHog

  • Product manual
  • Apps manuals
  • Tutorials
  • Community

  • Questions?
  • Product roadmap
  • Contributors
  • Partners
  • Newsletter
  • Merch
  • PostHog FM
  • PostHog on GitHub
  • Handbook

  • Getting started
  • Company
  • Strategy
  • How we work
  • Small teams
  • People & Ops
  • Engineering
  • Product
  • Design
  • Marketing
  • Customer success
  • Company

  • About
  • Team
  • Investors
  • Press
  • Blog
  • FAQ
  • Support
  • Careers
© 2022 PostHog, Inc.
  • Code of conduct
  • Privacy policy
  • Terms