Setting up Delta Lake in multi-cluster mode
When in multi-cluster mode, Yandex Data Processing uses a Yandex Managed Service for YDB database to manage access to Delta Lake tables from different clusters and Apache Spark™ jobs.
For more information about Delta Lake, see the Delta Lake in Yandex Data Processing section and the Delta Lake documentation
Note
Delta Lake is not part of Yandex Data Processing. It is not covered by Yandex Cloud support, and its usage is not governed by the Yandex Data Processing Terms of Use
Set up your infrastructure
-
Create a service account with the
ydb.editorrole for access to YDB. -
Create a static access key for the service account.
-
Create a Yandex Lockbox secret and place into it the static key data as two
key-valuepairs:- Key:
key-id; value:<static_key_ID>. - Key:
key-secret; value:<static_key_secret_part>.
- Key:
-
Set up one or more Yandex Data Processing clusters to work with Delta Lake:
-
If you do not have a Yandex Data Processing cluster, create one.
-
If you attached a Yandex Object Storage bucket to your cluster:
- Create a folder named
warehousein the bucket. - Set
spark.sql.warehouse.dirtos3a://<bucket_name>/warehouse/.
- Create a folder named
-
Create a Apache Hive™ Metastore cluster and connect it to your Yandex Data Processing cluster.
-
-
Assign the
lockbox.payloadViewerrole to the service account you used to create the Yandex Data Processing clusters. You can do this:
Set up the component properties to work with Delta Lake
-
Download the archive with the required Delta Lake libraries and add-ons to connect to Managed Service for YDB:
- Delta Lake 2.0.2
for Yandex Data Processing 2.1.0 or 2.1.3 - Delta Lake 2.3.0
for Yandex Data Processing 2.1.4 and higher
You can check out the source code for add-ons to connect to YDB in the repository:
- Delta Lake 2.0.2
-
Add the downloaded archive to the dependencies of all clusters or individual jobs that need access to Delta Lake tables. There are two ways to do this:
-
Save the archive to the Object Storage bucket and provide the file URL in the
spark.jarsproperty:spark.jars=s3a://<bucket_name>/<file_path>Make sure the cluster service account has read access to the bucket.
-
Copy the archive to all cluster nodes manually or using initialization scripts and provide the full file path in the
spark.driver.extraClassPathandspark.executor.extraClassPathproperties.
-
-
Set the following properties at the level of clusters or individual Apache Spark™ jobs that need access to Delta Lake tables:
- Set
spark.sql.extensionstoio.delta.sql.DeltaSparkSessionExtension. - Set
spark.sql.catalog.spark_catalogtoorg.apache.spark.sql.delta.catalog.DeltaCatalog. - Set
spark.delta.logStore.s3a.impltoru.yandex.cloud.custom.delta.YcS3YdbLogStore. - Set
spark.io.delta.storage.S3DynamoDBLogStore.ddb.endpointto the Document API endpoint value available on the Overview tab of your database in the management console . - Set
spark.io.delta.storage.S3DynamoDBLogStore.ddb.lockboxto the Lockbox secret ID value available on the Overview tab of your Lockbox in the management console .
- Set
You can now use Delta Lake in multi-cluster mode.
Delta Lake use case
This use case was tested on a Yandex Data Processing cluster of version 2.1.7.
-
Use SSH to connect to the Yandex Data Processing cluster's master host.
-
Run an Apache Spark™ session in the cluster by providing the required parameters:
spark-sql \ --conf spark.jars=s3a://<bucket_name>/yc-delta23-multi-dp21-1.1-fatjar.jar \ --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.YcDeltaCatalog \ --conf spark.delta.logStore.s3a.impl=ru.yandex.cloud.custom.delta.YcS3YdbLogStore \ --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.endpoint=<Document_API_endpoint> \ --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.lockbox=<secret_ID> -
In your active session, create a database and switch to it:
CREATE DATABASE testdelta; USE testdelta; -
Create a test table and populate it with data:
CREATE TABLE tab1(a INTEGER NOT NULL, b VARCHAR(100)) USING DELTA; INSERT INTO tab1 VALUES (1,'One'), (2,'Two'), (3,'Three'); -
Replace the
bcolumn values by adding to them theacolumn values converted to a string:UPDATE tab1 SET b=b || ' ** ' || CAST(a AS VARCHAR(10)); -
Check the result:
SELECT * FROM tab1;3 Three ** 3 2 Two ** 2 1 One ** 1
Additional multi-cluster mode settings for production clusters in Yandex Data Processing
To improve Delta Lake performance and streamline data storage when using multi-cluster mode, configure additional YDB settings.
Setting up Managed Service for YDB throughput
By default, a YDB in serverless mode is created with a throughput of 10 request units per second. This may be insufficient for intensive use of Delta Lake tables.
To avoid Delta Lake performance degradation due to insufficient YDB throughput, track the Document API units overflow parameter on the chart when monitoring YDB. Increase the throughput limit if needed.
There is a total throughput limit that applies to all YDB databases in the cloud, which depends on the quota. If necessary, contact support
Setting up auto cleanup
When working with Delta Lake, metadata versions that are not in use may accumulate in a YDB table or Object Storage buckets. You can streamline storage use and boost Delta Lake performance using a ready-made script
The script is installed in the cloud as two serverless functions:
- Function for cleaning up data in the YDB table. It is invoked automatically once an hour.
- Function for cleaning up data in the buckets. It is invoked automatically once a day.
To add these cleanup functions to your cloud:
-
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
-
Download files from the cf-cleanup
folder:cfunc.py: Cleanup script source code.delta-prefixes.txt: File with prefixes of paths to temporary Delta Lake files in buckets.pack.sh: ZIP archive creation script.requirements.txt: File with environment requirements to install functions.
Save these files to the
cf-cleanupfolder in the working directory. -
Make the
pack.shfile executable:chmod +x ./cf-cleanup/pack.sh -
In the
delta-prefixes.txtfile, specify the paths to the Object Storage bucket folders with temporary Delta Lake files. Make sure to provide each path in a new line in the following format:BucketName Mode PathPrefixThe
Modefield can take the following values:W: Warehouse, path for storing multiple databases.D: Database, path for storing a single database.T: Table, path for storing a specific single table.
Example:
mybucket1 W warehouse/ mybucket2 D warehouse/testdelta2.db/ mybucket3 T warehouse/testdelta3.db/tab1/ -
Place the
delta-prefixes.txtfile to your Object Storage bucket. -
Download and save the files for managing the cleanup functions to the working directory:
- ddb-maint-config.sh
: Setup parameters - ddb-maint-setup.sh
: Setup script - ddb-maint-remove.sh
: Removal script
- ddb-maint-config.sh
-
In the
ddb-maint-config.shfile, specify the following parameters:sa_name: Name of the service account that will be created to use the functions.cf_ddb_name: Name of the serverless database cleanup function; it must be unique within the folder.cf_s3_name: Name of the serverless bucket cleanup function; it must be unique within the folder.docapi_endpoint: Document API endpoint. You can find it on the Overview tab of your YDB database in the management console .docapi_table: Name of the Delta Lake table to clean.s3_prefix_file: Path to thedelta-prefixes.txtfile in the Object Storage bucket, e.g.,s3://<bucket_name>/delta-prefixes.txt.
-
Run the setup script in your local directory:
bash ./ddb-maint-setup.sh
This will add to the cloud the functions for cleaning up temporary files in YDB tables and Object Storage buckets. You can check the new functions using the management console
If you no longer need the cleanup functions, run this script to remove them:
bash ./ddb-maint-remove.sh
The spark.io.delta.storage.S3DynamoDBLogStore.ddb.ttl Spark property sets the TTL for metadata records, which is 86400 seconds (24 hours) by default. The actual TTL for a specific record may be longer, as it depends on when the cleanup function was run.