Table sharding in ClickHouse®
Sharding provides a number of benefits when dealing with high query rates and massive datasets. It works by creating a distributed table that routes queries to underlying tables. You can access data in sharded tables both directly or through the distributed table.
There are three primary sharding strategies:
- Classic approach, where the distributed table uses all shards in the cluster.
- Group-based approach, where some shards are grouped together.
- Advanced group-based approach, where shards are divided into two groups: one for the distributed table and the other for the underlying tables.
Below are configuration examples for all three sharding methods.
For more information, see Sharding in Managed Service for ClickHouse®.
To set up sharding:
If you no longer need the resources you created, delete them.
Required paid resources
The support cost for this solution includes:
- Managed Service for ClickHouse® cluster fee, which covers the use of computing resources allocated to hosts (including ZooKeeper hosts) and disk space (see Managed Service for ClickHouse® pricing).
- Fee for using public IP addresses if public access is enabled for cluster hosts (see Virtual Private Cloud pricing).
Getting started
Set up the infrastructure
-
Create a Managed Service for ClickHouse® cluster:
-
Cluster name:
chcluster. -
Disk type: Select the required disk type.
It will determine the minimum number of hosts per shard:
- Two hosts, if you select local SSDs (
local-ssd). - Three hosts, if you select network non-replicated SSDs (
network-ssd-nonreplicated).
To ensure fault tolerance using these disk types, you must add redundant hosts.
For more information, see Storage in Managed Service for ClickHouse®.
- Two hosts, if you select local SSDs (
-
DB name:
tutorial.
Cluster hosts must be accessible from the internet.
-
-
Create two additional shards named
shard2andshard3. -
Create shard groups. The number of shard groups depends on the sharding type:
- Group-based sharding requires one group named
sgroupthat will includeshard1andshard2. - Advanced group-based sharding requires two groups:
sgroupincludingshard1andshard2.sgroup_dataincludingshard3.
No shard groups are needed for classic sharding.
- Group-based sharding requires one group named
-
If using security groups, configure them to allow internet access to your cluster.
-
If you do not have Terraform yet, install it.
-
Get the authentication credentials. You can add them to environment variables or specify them later in the provider configuration file.
-
Configure and initialize a provider. There is no need to create a provider configuration file manually, you can download it
. -
Place the configuration file in a separate working directory and specify the parameter values. If you did not add the authentication credentials to environment variables, specify them in the configuration file.
-
Download one of the following sharding example configuration files to your current working directory:
- simple-sharding.tf
: Classic sharding. - sharding-with-groups.tf
: Group-based sharding. - advanced-sharding-with-groups.tf
: Advanced group-based sharding.
Each file describes the following:
- Network.
- Subnet.
- Default security group and rules for connecting to the cluster from the internet.
- Managed Service for ClickHouse® cluster with the required hosts and shards.
- simple-sharding.tf
-
In the configuration file, specify the username and password that will be used to access the Managed Service for ClickHouse® cluster.
-
Make sure the Terraform configuration files are correct using this command:
terraform validateTerraform will show any errors found in your configuration files.
-
Create the required infrastructure:
-
Run this command to view the planned changes:
terraform planIf you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply -
Confirm updating the resources.
-
Wait for the operation to complete.
-
All the required resources will be created in the specified folder. You can check resource availability and their settings in the management console
. -
Set up the clickhouse-client
Install and configure the clickhouse-client for database access.
Create tables with data
Let’s assume you need to enable sharding for the hits_v1 table
Replace the <table_structure> placeholder with column descriptions from this ClickHouse® guide
Once you enable sharding (by any method), you will be able to send SELECT and INSERT queries to the distributed table. These queries will be processed according to the specified configuration.
In the following examples, we use a random number, rand(), as a sharding key.
Classic sharding
In this example, the distributed table built from hits_v1 uses every shard in the chcluster cluster: shard1, shard2, and shard3.
Before operating the distributed table:
-
Connect to the
tutorialdatabase. -
Create the
hits_v1table on every host in the cluster using the MergeTree engine:CREATE TABLE tutorial.hits_v1 ON CLUSTER '{cluster}' ( <table_structure> ) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192
To create a distributed table named hits_v1_distributed in the cluster:
-
Connect to the
tutorialdatabase. -
Create a Distributed
table:CREATE TABLE tutorial.hits_v1_distributed ON CLUSTER '{cluster}' AS tutorial.hits_v1 ENGINE = Distributed('{cluster}', tutorial, hits_v1, rand())Here, you can use the
AS tutorial.hits_v1expression instead of explicitly stating the table structure since both tables,hits_v1_distributedandhits_v1, reside on the same cluster hosts.When creating a Distributed
table, usechclusteras the cluster ID. You can get it with the list of clusters in the folder.Tip
You can use the
{cluster}macro instead of the cluster ID. When running the query, ClickHouse will automatically substitute it with the ID of the cluster where theCREATE TABLEstatement is running.
Group-based sharding
In this example:
- We use a single shard group named
sgroup. - The distributed table and its underlying table,
hits_v1, are in the samesgroupshard group within the cluster.
Before operating the distributed table:
-
Connect to the
tutorialdatabase. -
Create the
hits_v1table on every host in thesgroupshard group, using the MergeTree engine:CREATE TABLE tutorial.hits_v1 ON CLUSTER sgroup ( <table_structure> ) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192
To create a distributed table named tutorial.hits_v1_distributed in the cluster:
-
Connect to the
tutorialdatabase. -
Create a Distributed
table:CREATE TABLE tutorial.hits_v1_distributed ON CLUSTER sgroup AS tutorial.hits_v1 ENGINE = Distributed(sgroup, tutorial, hits_v1, rand())Here, you can use the
AS tutorial.hits_v1expression instead of explicitly stating the table structure since both tables,hits_v1_distributedandhits_v1, reside on the same cluster hosts within a single shard.
Advanced group-based sharding
In this example:
- We use the
sgroupandsgroup_datashard groups. - The distributed table resides in the
sgroupshard group. - The
hits_v1underlying table resides in thesgroup_datashard group.
Before operating the distributed table:
-
Connect to the
tutorialdatabase. -
Create the
hits_v1table on every host of thesgroup_datashard group, using the ReplicatedMergeTree engine:CREATE TABLE tutorial.hits_v1 ON CLUSTER sgroup_data ( <table_structure> ) ENGINE = ReplicatedMergeTree('/tables/{shard}/hits_v1', '{replica}') PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity = 8192The ReplicatedMergeTree engine will provide fault tolerance to this solution.
To create a distributed table named tutorial.hits_v1_distributed in the cluster:
-
Connect to the
tutorialdatabase. -
Create a Distributed
table:CREATE TABLE tutorial.hits_v1_distributed ON CLUSTER sgroup ( <table_structure> ) ENGINE = Distributed(sgroup_data, tutorial, hits_v1, rand())Here you must explicitly specify the table structure because the
hits_v1_distributedandhits_v1tables reside on different hosts in separate shards.
Test the tables
To test your new distributed table named tutorial.hits_v1_distributed:
-
Load the
hits_v1test dataset:curl https://storage.yandexcloud.net/doc-files/managed-clickhouse/hits_v1.tsv.xz | unxz --threads=`nproc` > hits_v1.tsv -
Populate the table with test data:
clickhouse-client \ --host "<FQDN_of_any_host_with_distributed_table>" \ --secure \ --port 9440 \ --user "<username>" \ --password "<user_password>" \ --database "tutorial" \ --query "INSERT INTO tutorial.hits_v1_distributed FORMAT TSV" \ --max_insert_block_size=100000 < hits_v1.tsvYou can get the host names with the list of ClickHouse® hosts in the cluster.
-
Run one or multiple test queries against this table. For example, you can get the table row count:
SELECT count() FROM tutorial.hits_v1_distributedResult:
8873898
Delete the resources you created
Delete the resources you no longer need to avoid paying for them:
- Delete the Managed Service for ClickHouse® cluster.
- If you used static public IP addresses for cluster access, release and delete them.
-
In the terminal window, go to the directory containing the infrastructure plan.
Warning
Make sure the directory has no Terraform manifests with the resources you want to keep. Terraform deletes all resources that were created using the manifests in the current directory.
-
Delete resources:
-
Run this command:
terraform destroy -
Confirm deleting the resources and wait for the operation to complete.
All the resources described in the Terraform manifests will be deleted.
-
ClickHouse® is a registered trademark of ClickHouse, Inc