Setting up Delta Lake in multi-cluster mode
In multi-cluster mode, Yandex Data Processing uses a Yandex Managed Service for YDB database to control access to Delta Lake tables from different clusters and Apache Spark™ jobs.
For more information about Delta Lake, see the Delta Lake in Yandex Data Processing section in the Delta Lake documentation
Note
Delta Lake is not part of Yandex Data Processing. It is not covered by Yandex Cloud support and its usage is not governed by the Yandex Data Processing Terms of Use
Prepare the infrastructure
-
Create a service account with the
ydb.editor
role for access to YDB. -
Create a static access key for the service account.
-
Create a Yandex Lockbox secret and place there the static key data as two
key-value
pairs:- Key:
key-id
; value: <static_key_ID>` - Key:
key-secret
; value: <secret_part_of_static_key>`
- Key:
-
Set up one or more Yandex Data Processing clusters to work with Delta Lake:
-
If you do not have a Yandex Data Processing cluster, create one.
-
If you attached a Yandex Object Storage bucket for data storage to your cluster:
- Create a folder named
warehouse
in the bucket. - Set the
spark.sql.warehouse.dir
property to the following value:s3a://<bucket_name>/warehouse/
.
- Create a folder named
-
Create a Hive Metastore cluster and connect it to your Yandex Data Processing cluster.
-
-
Assign the
lockbox.payloadViewer
role to the service account you used to create the Yandex Data Processing clusters. You can do this:
Set up the component properties to work with Delta Lake
-
Download the archive with required Delta Lake libraries and add-ons to connect to Managed Service for YDB:
- Delta Lake 2.0.2
for Yandex Data Processing 2.1.0 or 2.1.3 - Delta Lake 2.3.0
for Yandex Data Processing 2.1.4 and higher
You can check out the source code for add-ons to connect to YDB in the repository:
- Delta Lake 2.0.2
-
Add the downloaded archive to the dependencies of all clusters or individual jobs that need access to Delta Lake tables. You can do this using two methods:
-
Save the archive to the Object Storage bucket and provide the file URL in the
spark.jars
property:spark.jars=s3a://<bucket_name>/<path_to_file>
Make sure the cluster service account has read access to the bucket.
-
Copy the archive to all the cluster nodes manually or using initialization scripts and provide the full file path in the
spark.driver.extraClassPath
andspark.executor.extraClassPath
properties.
-
-
Set the following properties at the level of clusters or individual Apache Spark™ jobs that need access to Delta Lake tables:
- Set
spark.sql.extensions
toio.delta.sql.DeltaSparkSessionExtension
. - Set
spark.sql.catalog.spark_catalog
toorg.apache.spark.sql.delta.catalog.DeltaCatalog
. - Set
spark.delta.logStore.s3a.impl
toru.yandex.cloud.custom.delta.YcS3YdbLogStore
. - Set
spark.io.delta.storage.S3DynamoDBLogStore.ddb.endpoint
to the Document API endpoint value from the Overview tab of your database in the management console . - Set
spark.io.delta.storage.S3DynamoDBLogStore.ddb.lockbox
to the Lockbox secret ID value from the Overview tab of your Lockbox in the management console .
- Set
You can now use Delta Lake in multi-cluster mode.
Delta Lake usage example
The use case was tested on a Yandex Data Processing cluster version 2.1.7.
-
Use SSH to connect to the Yandex Data Processing cluster's master host.
-
Run a Apache Spark™ session in the cluster by providing the required parameters:
spark-sql \ --conf spark.jars=s3a://<bucket_name>/yc-delta23-multi-dp21-1.1-fatjar.jar \ --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.YcDeltaCatalog \ --conf spark.delta.logStore.s3a.impl=ru.yandex.cloud.custom.delta.YcS3YdbLogStore \ --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.endpoint=<Document_API_endpoint> \ --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.lockbox=<secret_ID>
-
Within the active session, create a database and switch to it:
CREATE DATABASE testdelta; USE testdelta;
-
Create a test table and populate it with data:
CREATE TABLE tab1(a INTEGER NOT NULL, b VARCHAR(100)) USING DELTA; INSERT INTO tab1 VALUES (1,'One'), (2,'Two'), (3,'Three');
-
Replace the
b
column values by adding to them thea
column values converted to a string:UPDATE tab1 SET b=b || ' ** ' || CAST(a AS VARCHAR(10));
-
Check the result:
SELECT * FROM tab1;
3 Three ** 3 2 Two ** 2 1 One ** 1
Additional multi-cluster mode settings for Yandex Data Processing production clusters
To improve Delta Lake performance and optimize data storage when using multi-cluster mode, configure additional YDB settings.
Setting up Managed Service for YDB capacity
By default, a YDB DB in serverless mode is created with a capacity of 10 Request Units per second. This may be insufficient under intensive use of Delta Lake tables.
To avoid Delta Lake performance degradation due to insufficient YDB capacity, track the Document API units overflow parameter on the chart when monitoring YDB. Increase the capacity limit if needed.
There is a total capacity limit that applies to all YDB databases in the cloud, which depends on the quota. If necessary, contact support
Setting up auto cleanup
When working with Delta Lake, metadata versions that are not in use may accumulate in a YDB table or Object Storage buckets. You can optimize storage use and boost Delta Lake performance using a ready-made script
The script is installed in the cloud as two serverless functions:
- Function for cleaning up the YDB table data. It is invoked automatically once an hour.
- Function for cleaning up the bucket data. It is invoked automatically once a day.
To add the cleanup functions to your cloud:
-
If you do not have the Yandex Cloud command line interface yet, install and initialize it.
-
Download files from the cf-cleanup
folder:cfunc.py
: Cleanup script source code.delta-prefixes.txt
: File with prefixes of paths to temporary Delta Lake files in buckets.pack.sh
: ZIP archive creation script.requirements.txt
: File with environment requirements to install functions.
Save these files to the
cf-cleanup
folder in the working directory. -
Make the
pack.sh
file executable:chmod +x ./cf-cleanup/pack.sh
-
In the
delta-prefixes.txt file
, specify the paths to the Object Storage bucket folders with temporary Delta Lake files. Make sure each path is given in a new line in the following format:BucketName Mode PathPrefix
The
Mode
field may take the following values:W
: Warehouse, path for storing multiple databases.D
: Database, path for storing a single database.T
: Table, path for storing a specific single table.
Example:
mybucket1 W warehouse/ mybucket2 D warehouse/testdelta2.db/ mybucket3 T warehouse/testdelta3.db/tab1/
-
Place the
delta-prefixes.txt
file to your Object Storage bucket. -
Download and save the cleanup function control files to the working directory:
- ddb-maint-config.sh
: Setup parameters - ddb-maint-setup.sh
: Setup script - ddb-maint-remove.sh
: Removal script
- ddb-maint-config.sh
-
In the
ddb-maint-config.sh
file, specify the following parameters:sa_name
: Name of the service account that will be created to use the functions.cf_ddb_name
: Name of the serverless database cleanup function; it must be unique within the folder.cf_s3_name
: Name of the serverless bucket cleanup function; it must be unique within the folder.docapi_endpoint
: Document API endpoint. You can find it in the Overview tab of your YDB database in the management console .docapi_table
: Name of the Delta Lake table to be cleaned up.s3_prefix_file
: Path to thedelta-prefixes.txt
file in the Object Storage bucket, e.g.,s3://<bucket_name>/delta-prefixes.txt
.
-
Run the setup script in your local directory:
bash ./ddb-maint-setup.sh
The functions for cleaning up temporary files in YDB tables and Object Storage buckets will be added to the cloud. You can check the new functions using the management console
If you no longer need the cleanup functions, run this script to remove them:
bash ./ddb-maint-remove.sh
The spark.io.delta.storage.S3DynamoDBLogStore.ddb.ttl
Spark property sets the TTL for metadata records, which is 86400
seconds (24 hours) by default. The actual TTL for a specific record may be longer as it depends on when the cleanup function was run.