This document provides information on:
- How data replication and Distributed table engine works in ClickHouse
- Sharding MergeTree tables
- How to monitor replication
- How to reason about distributed query execution
- Important settings for distributed query execution
- Doing ad-hoc distributed queries
Setting up replicated tables
A great guide on setting up replicated tables on a pre-existing cluster can be found in ClickHouse documentation.
Some important highlights are:
- ClickHouse replication works on a table-by-table level, tables need to be created on all shards (preferably via using
ON CLUSTER
) - Replication requires a running ZooKeeper setup. In the future, this might be replaced by
clickhouse-keeper
IMPORTANT GOTCHA:Always use unique ZooKeeper paths for table definitions as re-use can and will lead to data loss. This applies even if the previous table has been dropped.
Sharding replicated tables
Sharding helps scale a dataset by having each node only store part of the data.
To decide whether to shard a table, consider how it's queried and what data it stores:
- Shard: tables that could become too large for a single server (e.g. events, logs, raw analytics data)
- Don't shard: table often JOINed in queries (e.g. persons, groups, cohorts) where the whole dataset is needed.
Sharding also requires care given in the schema - queries touching data should ideally only need to load data from a given shard.
When creating a replicated table, configuring whether a table is sharded or not is done via varying the parameters to a ReplicatedMergeTree engine:
- Example sharded engine:
ReplicatedMergeTree('/zk/some/path/{shard}/tablename', '{replica}')
- Example not sharded table engine:
ReplicatedMergeTree('/zk/some/path/tablename', '{replica}-{shard}')
Note that resharding large tables is currently relatively painful and bespoke operation - be careful choosing a good sharding key.
Monitoring replication
When doing larger cluster operations, it's often important to keep an eye on replication. The system.replication_queue
and system.replicated_fetches
tables can provide at-a-glance overview of what the system is doing.
Distributed
table engine
Distributed
table engine tables
are used to query and write to sharded tables. Note that Distributed engine tables do not store any data on its own
but rather always fan out to ReplicatedMergeTree
tables on the cluster.
How writes against Distributed
tables work
When INSERTing data against Distributed tables, ClickHouse decides which shard each row belongs to and forwards data to relevant shard(s) based on the sharding_key.
Note that if your underlying table has columns that ClickHouse populates (e.g. ALIAS, MATERIALIZED), it's often necessary to set up two Distributed tables:
- One for writes containing a minimum set of columns
- Another for reads which contain all columns
How queries against Distributed
tables work
When querying Distributed table, you can send the query to any node in the ClickHouse cluster. That node becomes the coordinator
, which:
- Figures out what queries individual shards need to execute and queues these queries
- Once results are in, aggregates the results together and returns an answer
Given local execution is faster than reading data over the network, ClickHouse will usually perform one of the queries locally instead of sending it to another replica of its shard.
Depending on the query, sub-queries executed on other shards might either return already aggregated data or stream entire datasets across the network. Being aware of which is done is crucial for performance.
Example query - distributed sums
Consider the following tables:
CREATE TABLE sharded_sensor_values ON CLUSTER 'my_cluster' (timestamp DateTime,site_id UInt32,event VARCHAR,uuid UUID,metric_value Int32)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/sharded_sensor_values', '{replica}')ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)SETTINGS index_granularity = 8192CREATE TABLE distributed_sensor_values ON CLUSTER 'my_cluster' (timestamp DateTime,site_id UInt32,event VARCHAR,uuid UUID,metric_value Int32)ENGINE = Distributed('my_cluster', 'default', 'sharded_sensor_values', intHash64(site_id))
Writes and queries should be made against table distributed_sensor_values
in this schema. It then distributes the data according to site_id
.
See query to populate data
INSERT INTO distributed_sensor_valuesSELECT *FROM generateRandom('timestamp DateTime, site_id UInt8, event VARCHAR, uuid UUID, metric_value Int32', NULL, 10)LIMIT 100000000
Consider this simple aggregation query executed against clickhouse01
:
SELECT hostName(), sum(metric_value) FROM distributed_sensor_values GROUP BY hostName()-- Results:-- ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓-- ┃ hostname() ┃ sum(metric_value) ┃-- ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩-- │ clickhouse01 │ -9035836479117 │-- ├──────────────┼───────────────────┤-- │ clickhouse03 │ 10003905228283 │-- └──────────────┴───────────────────┘
hostname
is a clickhouse helper function which
returns the hostname query is executed on.
In this case clickhouse01
was the coordinator node. It:
- sent out a subset of the query to
clickhouse03
on other shard to execute. The query wasSELECT hostname(), sum(`metric_value`) FROM `default`.`sharded_sensor_values` GROUP BY hostname()
- ran the query locally, getting aggregated results
- combined both the local and remote results
In this case, minimal network traffic happened since the results of a query could be combined independently.
Click to see full `EXPLAIN` plan
``` Expression ((Projection + Before ORDER BY)) Header: hostname() String sum(metric_value) Int64 MergingAggregated Header: hostname() String sum(metric_value) Int64 SettingQuotaAndLimits (Set limits and quota after reading from storage) Header: hostname() String sum(metric_value) AggregateFunction(sum, Int32) Union Header: hostname() String sum(metric_value) AggregateFunction(sum, Int32) Aggregating Header: hostname() String sum(metric_value) AggregateFunction(sum, Int32) Expression (Before GROUP BY) Header: metric_value Int32 hostname() String SettingQuotaAndLimits (Set limits and quota after reading from storage) Header: metric_value Int32 ReadFromMergeTree Header: metric_value Int32 Indexes: PrimaryKey Condition: true Parts: 6/6 Granules: 5723/5723 ReadFromRemote (Read from remote replica) Header: hostname() String sum(metric_value) AggregateFunction(sum, Int32) ```Example query: LIMIT, filter and aggregate
Consider this query:
SELECTsite_id,uniq(event)FROM distributed_sensor_valuesWHERE timestamp > '2010-01-01' and timestamp < '2023-01-01'GROUP BY site_idORDER BY uniq(event) DESCLIMIT 20
In this case, the query sent to other shards cannot do all the work on its own. Instead, the query being sent to the other shard would look something like the following:
SELECTsite_id,uniqState(event)FROM sharded_sensor_valuesWHERE timestamp > '2010-01-01' and timestamp < '2023-01-01'GROUP BY site_id
In EXPLAIN
output, this would be expressed as:
ReadFromRemote (Read from remote replica)Header: site_id UInt32uniq(event) AggregateFunction(uniq, String)
In this case coordinator needs to receive a lot of data from the other shards to calculate the correct results:
- It loads data for every site_id on the other shards
- It cannot just load the unique event count from the other shards, but rather needs to know what events were seen or not
This query is expensive in terms of the amount of data that needs to be transferred over the network.
One thing that makes this query more efficient is uniqState
, which is a aggregate function combinator. It's useful since rather needing to send over all the events, the coordinator can send back an optimized bitmap-like structure that the coordinator can combine with its own results.
Click to see full `EXPLAIN` plan
Expression (Projection)Header: site_id UInt32uniq(event) UInt64Limit (preliminary LIMIT (without OFFSET))Header: site_id UInt32uniq(event) UInt64Sorting (Sorting for ORDER BY)Header: site_id UInt32uniq(event) UInt64Expression (Before ORDER BY)Header: site_id UInt32uniq(event) UInt64MergingAggregatedHeader: site_id UInt32uniq(event) UInt64SettingQuotaAndLimits (Set limits and quota after reading from storage)Header: site_id UInt32uniq(event) AggregateFunction(uniq, String)UnionHeader: site_id UInt32uniq(event) AggregateFunction(uniq, String)AggregatingHeader: site_id UInt32uniq(event) AggregateFunction(uniq, String)Expression (Before GROUP BY)Header: site_id UInt32event StringSettingQuotaAndLimits (Set limits and quota after reading from storage)Header: site_id UInt32event StringReadFromMergeTreeHeader: site_id UInt32event StringIndexes:PrimaryKeyKeys:toStartOfDay(timestamp)Condition: and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))Parts: 6/6Granules: 1628/5723ReadFromRemote (Read from remote replica)Header: site_id UInt32uniq(event) AggregateFunction(uniq, String)
Improving this query
This query can be made faster by setting the
distributed_group_by_no_merge
setting, like so:
SELECTsite_id,uniq(event)FROM distributed_sensor_valuesWHERE timestamp > '2010-01-01' and timestamp < '2023-01-01'GROUP BY site_idORDER BY uniq(event) DESCSETTINGS distributed_group_by_no_merge=1LIMIT 20
After this, the coordinator knows to trust that the data is sharded according to site_id
and it can send the same query down to other shards.
In EXPLAIN
, this is represented by the ReadFromRemote
being done later in the cycle and now reading UInt64
instead of AggregateFunction(uniq, String)
:
ReadFromRemote (Read from remote replica)Header: site_id UInt32uniq(event) UInt64
Takeaway: Proper data layout and usage of query settings can improve queries significantly by doing less work over the network.
Click to see full `EXPLAIN` plan
Header: site_id UInt32uniq(event) UInt64UnionHeader: site_id UInt32uniq(event) UInt64Expression (Projection)Header: site_id UInt32uniq(event) UInt64Limit (preliminary LIMIT (without OFFSET))Header: site_id UInt32uniq(event) UInt64Sorting (Sorting for ORDER BY)Header: site_id UInt32uniq(event) UInt64Expression (Before ORDER BY)Header: site_id UInt32uniq(event) UInt64AggregatingHeader: site_id UInt32uniq(event) UInt64Expression (Before GROUP BY)Header: site_id UInt32event StringSettingQuotaAndLimits (Set limits and quota after reading from storage)Header: site_id UInt32event StringReadFromMergeTreeHeader: site_id UInt32event StringIndexes:PrimaryKeyKeys:toStartOfDay(timestamp)Condition: and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))Parts: 6/6Granules: 1628/5723ReadFromRemote (Read from remote replica)Header: site_id UInt32uniq(event) UInt64
Query settings
Some noteworthy query settings which affect the behavior of distributed queries are:
- distributed_group_by_no_merge
- distributed_push_down_limit
- optimize_distributed_group_by_sharding_key
- prefer_localhost_replica
Many of these unlock potential optimizations by streaming less data over the network, but require data to be sharded correctly to work.
Ad-hoc distributed queries
It's sometimes useful to query data from across the cluster without setting up Distributed tables, for example to query system tables on all nodes or shards.
This can be done as such:
SELECT hostName(), shardNum(), *FROM clusterAllReplicas('my_cluster', 'system', 'metrics')
More documentation on this can be found at:
Further reading
- Data Replication ClickHouse docs
- Strength in Numbers: Introduction to ClickHouse Cluster Performance
- Engines
- ZooKeeper schema
Next in the ClickHouse manual: Data ingestion