Настройка Delta Lake в однокластерном режиме
Yandex Data Processing версии 2.0 и выше поддерживает использование Delta Lake в однокластерном режиме.
Подробную информацию о Delta Lake см. в разделе Delta Lake в Yandex Data Processing и в документации Delta Lake
Примечание
Delta Lake не является частью сервиса Yandex Data Processing и не сопровождается командой разработки и службой поддержки Yandex Cloud, а его использование не входит в условия использования Yandex Data Processing
Важно
Если разные Spark-задания в однокластерном режиме одновременно изменяют данные в таблице, информация может быть потеряна.
Настройте Spark-задания так, чтобы исключить одновременную модификацию данных или используйте мультикластерный режим. Подробнее в документации Delta Lake
Подготовьте инфраструктуру
-
Если у вас нет кластера Yandex Data Processing, создайте его.
-
Если для хранения данных вы подключили к кластеру бакет Yandex Object Storage:
- Создайте в бакете каталог c именем
warehouse
. - Установите свойство
spark.sql.warehouse.dir
в значениеs3a://<имя_бакета>/warehouse/
.
- Создайте в бакете каталог c именем
-
Создайте кластер Hive Metastore и подключите его к кластеру Yandex Data Processing.
Настройте свойства компонентов для работы с Delta Lake
-
Установите следующие свойства на уровне кластера или на уровне отдельного задания:
spark.sql.extensions
в значениеio.delta.sql.DeltaSparkSessionExtension
;spark.sql.catalog.spark_catalog
в значениеorg.apache.spark.sql.delta.catalog.DeltaCatalog
.
-
Добавьте библиотеки Delta Lake в зависимости кластера или отдельного задания (нужные версии библиотек зависят от версии Yandex Data Processing):
Yandex Data Processing 2.0.xYandex Data Processing 2.1.0 или 2.1.3Yandex Data Processing 2.1.4 и вышеВоспользуйтесь одним из способов:
-
Скачайте файл библиотеки delta-core_2.12-0.8.0.jar
, сохраните его в бакет Object Storage и передайте URL файла в свойствеspark.jars
:spark.jars=s3a://<имя_бакета>/<путь_к_файлу>
Сервисный аккаунт кластера должен иметь доступ на чтение из бакета.
-
Настройте доступ кластера к репозиторию Maven
и установите свойствоspark.jars.packages
в значениеio.delta:delta-core_2.12:0.8.0
.Настроить доступ к Maven можно двумя способами:
- В группе безопасности кластера разрешите сетевой доступ к репозиторию Maven Central
. - Настройте альтернативный репозиторий Maven
и разрешите трафик к нему в группе безопасности кластера.
- В группе безопасности кластера разрешите сетевой доступ к репозиторию Maven Central
-
Скачайте файл библиотеки delta-core_2.12-0.8.0.jar
, скопируйте его на все узлы кластера вручную или с помощью скриптов инициализации и передайте полный путь к файлу в свойствахspark.driver.extraClassPath
иspark.executor.extraClassPath
.
Воспользуйтесь одним из способов:
-
Скачайте файлы библиотек delta-core_2.12-2.0.2.jar
и delta-storage-2.0.2.jar , сохраните их в бакет Object Storage и передайте URL файлов через запятую в свойствеspark.jars
:spark.jars=s3a://<имя_бакета>/<путь_к_файлу_core>,s3a://<имя_бакета>/<путь_к_файлу_storage>
Сервисный аккаунт кластера должен иметь доступ на чтение из бакета.
-
Настройте доступ кластера к репозиторию Maven
и установите свойствоspark.jars.packages
в значениеio.delta:delta-core_2.12:2.0.2,io.delta:delta-storage:2.0.2
.Настроить доступ к Maven можно двумя способами:
- В группе безопасности кластера разрешите сетевой доступ к репозиторию Maven Central
. - Настройте альтернативный репозиторий Maven
и разрешите трафик к нему в группе безопасности кластера.
- В группе безопасности кластера разрешите сетевой доступ к репозиторию Maven Central
-
Скачайте файлы библиотек delta-core_2.12-2.0.2.jar
и delta-storage-2.0.2.jar , скопируйте их на все узлы кластера вручную или с помощью скриптов инициализации и передайте полный путь к файлам в свойствахspark.driver.extraClassPath
иspark.executor.extraClassPath
.
Воспользуйтесь одним из способов:
-
Скачайте файлы библиотек delta-core_2.12-2.3.0.jar
и delta-storage-2.3.0.jar , сохраните их в бакет Object Storage и передайте URL файлов через запятую в свойствеspark.jars
:spark.jars=s3a://<имя_бакета>/<путь_к_файлу core>,s3a://<имя_бакета>/<путь_к_файлу_storage>
Сервисный аккаунт кластера должен иметь доступ на чтение из бакета.
-
Настройте доступ кластера к репозиторию Maven
и установите свойствоspark.jars.packages
в значениеio.delta:delta-core_2.12:2.3.0,io.delta:delta-storage:2.3.0
.Настроить доступ к Maven можно двумя способами:
- В группе безопасности кластера разрешите сетевой доступ к репозиторию Maven Central
. - Настройте альтернативный репозиторий Maven
и разрешите трафик к нему в группе безопасности кластера.
- В группе безопасности кластера разрешите сетевой доступ к репозиторию Maven Central
-
Скачайте файлы библиотек delta-core_2.12-2.3.0.jar
и delta-storage-2.3.0.jar , скопируйте их на все узлы кластера вручную или с помощью скриптов инициализации и передайте полный путь к файлам в свойствахspark.driver.extraClassPath
иspark.executor.extraClassPath
.
-
Теперь вы можете использовать Delta Lake в кластере Yandex Data Processing.
Если перечисленные свойства Spark переданы на уровне кластера, то для работы с таблицами Delta Lake можно использовать Spark Thrift Server.
Пример использования Delta Lake
Пример проверялся в кластере Yandex Data Processing версии 2.0 с доступом к репозиторию Maven Central.
-
Подключитесь по SSH к хосту-мастеру кластера Yandex Data Processing.
-
Запустите в кластере сессию Spark, передав необходимые параметры:
spark-sql \ --conf spark.jars.packages=io.delta:delta-core_2.12:0.8.0 \ --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
-
В рамках запущенной сессии создайте базу данных и переключитесь на нее:
CREATE DATABASE testdelta; USE testdelta;
-
Создайте тестовую таблицу и наполните ее данными:
CREATE TABLE tab1(a INTEGER NOT NULL, b VARCHAR(100)) USING DELTA; INSERT INTO tab1 VALUES (1,'One'), (2,'Two'), (3,'Three');
-
Замените значения в столбце
b
, добавив к ним значения из столбцаa
, преобразованные в строку:UPDATE tab1 SET b=b || ' ** ' || CAST(a AS VARCHAR(10));
-
Проверьте результат:
SELECT * FROM tab1;
3 Three ** 3 2 Two ** 2 1 One ** 1