Настройка Delta Lake в мультикластерном режиме
При работе в мультикластерном режиме Yandex Data Processing использует базу данных Yandex Managed Service for YDB для координации доступа к таблицам Delta Lake из разных кластеров и заданий Apache Spark™.
Подробную информацию о Delta Lake см. в разделе Delta Lake в Yandex Data Processing и в документации Delta Lake
Примечание
Delta Lake не является частью сервиса Yandex Data Processing и не сопровождается командой разработки и службой поддержки Yandex Cloud, а его использование не входит в условия использования Yandex Data Processing
Подготовьте инфраструктуру
-
Создайте сервисный аккаунт с ролью
ydb.editor
для доступа к YDB. -
Создайте статический ключ доступа для сервисного аккаунта.
-
Создайте секрет в Yandex Lockbox и поместите в него данные статического ключа в формате двух пар
ключ-значение
:- ключ:
key-id
, значение:<идентификатор_статического_ключа>
; - ключ:
key-secret
, значение:<секретная_часть_статического_ключа>
.
- ключ:
-
Настройте один или несколько кластеров Yandex Data Processing для работы с Delta Lake:
-
Если у вас нет кластера Yandex Data Processing, создайте его.
-
Если для хранения данных вы подключили к кластеру бакет Yandex Object Storage:
- Создайте в бакете каталог c именем
warehouse
. - Установите свойство
spark.sql.warehouse.dir
в значениеs3a://<имя_бакета>/warehouse/
.
- Создайте в бакете каталог c именем
-
Создайте кластер Hive Metastore и подключите его к кластеру Yandex Data Processing.
-
-
Выдайте роль
lockbox.payloadViewer
сервисному аккаунту, который использовался при создании кластеров Yandex Data Processing. Это можно сделать:
Настройте свойства компонентов для работы с Delta Lake
-
Скачайте архив с необходимыми библиотеками Delta Lake и надстройками для подключения к Managed Service for YDB:
- Delta Lake 2.0.2
для Yandex Data Processing версии 2.1.0 или 2.1.3; - Delta Lake 2.3.0
для Yandex Data Processing версии 2.1.4 и выше.
Ознакомиться с исходным кодом надстроек для подключения к YDB можно в репозитории:
- Delta Lake 2.0.2
-
Добавьте скачанный архив в зависимости всех кластеров или отдельных заданий, которые должны иметь доступ к таблицам Delta Lake. Это можно сделать двумя способами:
-
Сохраните архив в бакет Object Storage и передайте URL файла в свойстве
spark.jars
:spark.jars=s3a://<имя_бакета>/<путь_к_файлу>
Сервисный аккаунт кластера должен иметь доступ на чтение из бакета.
-
Скопируйте архив на все узлы кластера вручную или с помощью скриптов инициализации и передайте полный путь к файлу в свойствах
spark.driver.extraClassPath
иspark.executor.extraClassPath
-
-
Установите следующие свойства на уровне кластеров или на уровне отдельных заданий Apache Spark™, которые должны иметь доступ к таблицам Delta Lake:
spark.sql.extensions
в значениеio.delta.sql.DeltaSparkSessionExtension
;spark.sql.catalog.spark_catalog
в значениеorg.apache.spark.sql.delta.catalog.DeltaCatalog
;spark.delta.logStore.s3a.impl
в значениеru.yandex.cloud.custom.delta.YcS3YdbLogStore
;spark.io.delta.storage.S3DynamoDBLogStore.ddb.endpoint
в значение Document API эндпоинта, доступное на вкладке Обзор вашей базы данных в консоли управления ;spark.io.delta.storage.S3DynamoDBLogStore.ddb.lockbox
в значение идентификатора секрета Lockbox, который доступен на вкладке Обзор вашего Lockbox в консоли управления .
Теперь вы можете использовать Delta Lake в мультикластерном режиме.
Пример использования Delta Lake
Пример проверялся в кластере Yandex Data Processing версии 2.1.7.
-
Подключитесь по SSH к хосту-мастеру кластера Yandex Data Processing.
-
Запустите в кластере сессию Apache Spark™, передав необходимые параметры:
spark-sql \ --conf spark.jars=s3a://<имя_бакета>/yc-delta23-multi-dp21-1.1-fatjar.jar \ --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.YcDeltaCatalog \ --conf spark.delta.logStore.s3a.impl=ru.yandex.cloud.custom.delta.YcS3YdbLogStore \ --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.endpoint=<Document_API_эндпоинт> \ --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.lockbox=<идентификатор_секрета>
-
В рамках запущенной сессии создайте базу данных и переключитесь на нее:
CREATE DATABASE testdelta; USE testdelta;
-
Создайте тестовую таблицу и наполните ее данными:
CREATE TABLE tab1(a INTEGER NOT NULL, b VARCHAR(100)) USING DELTA; INSERT INTO tab1 VALUES (1,'One'), (2,'Two'), (3,'Three');
-
Замените значения в столбце
b
, добавив к ним значения из столбцаa
, преобразованные в строку:UPDATE tab1 SET b=b || ' ** ' || CAST(a AS VARCHAR(10));
-
Проверьте результат:
SELECT * FROM tab1;
3 Three ** 3 2 Two ** 2 1 One ** 1
Дополнительные настройки мультикластерного режима для производственных кластеров Yandex Data Processing
Для повышения производительности работы Delta Lake и оптимизации хранения данных при работе в мультикластерном режиме рекомендуется выполнить дополнительные настройки YDB.
Настройка пропускной способности Managed Service for YDB
По умолчанию YDB в Serverless-режиме создается с пропускной способностью 10 Request Units в секунду. При интенсивной работе с таблицами Delta Lake этого может оказаться недостаточно.
Чтобы не допустить снижения производительности Delta Lake из-за недостаточной пропускной способности YDB, отслеживайте поведение параметра Document API units overflow по графику при мониторинге YDB. При необходимости увеличьте предел пропускной способности.
На все базы данных YDB в облаке действует общий предел пропускной способности, который определяется квотой. При необходимости обратитесь в техническую поддержку
Настройка автоматической очистки мусора
При работе с Delta Lake неиспользуемые версии метаданных могут накапливаться в таблице YDB и в бакетах Object Storage. Оптимизировать использование хранилища и увеличить эффективность работы Delta Lake можно с помощью подготовленного скрипта
Скрипт устанавливается в облако в виде двух Serverless-функций:
- Функция очистки данных в таблице YDB. Запускается автоматически один раз в час.
- Функция очистки данных в бакетах. Запускается автоматически один раз в сутки.
Чтобы установить функции очистки в облако:
-
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
-
Скачайте файлы из каталога cf-cleanup
:cfunc.py
— исходный код скрипта очистки;delta-prefixes.txt
— файл с префиксами путей к временным файлам Delta Lake в бакетах;pack.sh
— скрипт создания ZIP-архива;requirements.txt
— файл с требованиями к окружению, необходим для установки функций.
Поместите эти файлы в каталог с именем
cf-cleanup
в рабочей директории. -
Сделайте файл
pack.sh
исполняемым:chmod +x ./cf-cleanup/pack.sh
-
В файле
delta-prefixes.txt
укажите пути к каталогам в бакетах Object Storage, содержащим временные файлы Delta Lake. Каждый путь должен быть указан в новой строке в следующем формате:ИмяБакета Режим ПрефиксПути
Поле
Режим
может принимать значения:W
— Warehouse, путь хранения нескольких баз данных.D
— Database, путь хранения одной базы данных.T
— Table, путь хранения одной конкретной таблицы.
Пример:
mybucket1 W warehouse/ mybucket2 D warehouse/testdelta2.db/ mybucket3 T warehouse/testdelta3.db/tab1/
-
Поместите файл
delta-prefixes.txt
в бакет Object Storage. -
Скачайте и поместите в рабочую директорию файлы для управления функциями очистки:
- ddb-maint-config.sh
— параметры установки. - ddb-maint-setup.sh
— сценарий установки. - ddb-maint-remove.sh
— сценарий удаления.
- ddb-maint-config.sh
-
В файле
ddb-maint-config.sh
укажите значения для следующих параметров:sa_name
— имя сервисного аккаунта, который будет создан для работы функций.cf_ddb_name
— имя Serverless-функции для очистки базы данных, должно быть уникальным в каталоге.cf_s3_name
— имя Serverless-функции для очистки бакетов, должно быть уникальным в каталоге.docapi_endpoint
— Document API эндпоинт. Доступен на вкладке Обзор вашей базы данных YDB в консоли управления .docapi_table
— имя таблицы Delta Lake для которой будет выполняться очистка.s3_prefix_file
— путь к файлуdelta-prefixes.txt
в бакете Object Storage, напримерs3://<имя_бакета>/delta-prefixes.txt
.
-
Запустите в локальной директории сценарий установки:
bash ./ddb-maint-setup.sh
Функции очистки временных файлов из таблиц YDB и бакетов Object Storage будут установлены в облако. Проверить их наличие можно в консоли управления
Если функции очистки вам больше не нужны, запустите сценарий их удаления:
bash ./ddb-maint-remove.sh
Время жизни записей метаданных задается свойством Spark spark.io.delta.storage.S3DynamoDBLogStore.ddb.ttl
и по умолчанию составляет 86400
секунд (одни сутки). Фактическое время жизни для конкретной записи может оказаться больше, поскольку зависит от момента запуска функции очистки.