Работа с таблицей в Object Storage из PySpark-задания с использованием Apache Hive™ Metastore и Apache Iceberg™
Интеграция Yandex Managed Service for Apache Spark™ с Apache Hive™ Metastore и Apache Iceberg™ дает ряд возможностей и преимуществ при работе с данными в Object Storage через SQL-таблицы.
Apache Hive™ Metastore обеспечивает:
- Централизованное хранение метаданных о базах, таблицах и партициях.
- Упрощенный доступ к данным без указания путей и схем вручную.
- Хранение статистик таблиц и колонок для оптимизации запросов.
Apache Iceberg™ обеспечивает:
- Версионирование данных и хранение снимков состояний.
- ACID-транзакции, которые поддерживают операции
UPDATE,DELETE,MERGE, а также эволюцию таблиц и способа партиционирования. - Масштабируемость с сохранением высокой производительности операций.
В этом руководстве показано, как использовать следующие возможности Apache Hive™ Metastore и Apache Iceberg™ при работе с S3-хранилищем из PySpark-задания:
-
Обращение к таблице по имени.
Apache Hive™ Metastore использует глобальный каталог метаданных для всех кластеров. Сохраненные метаданные затем может использовать любое приложение из любого кластера Apache Spark™, подключенного к этому кластеру Apache Hive™ Metastore.
-
Создание и чтение снимков метаданных.
Apache Iceberg™ фиксирует каждую запись в таблицу как новый снимок метаданных. В дальнейшем к этим снимкам можно обращаться, указав момент времени или идентификатор снимка.
Чтобы реализовать описанный пример:
Если созданные ресурсы вам больше не нужны, удалите их.
Необходимые платные ресурсы
В стоимость поддержки описываемого решения входят:
- Плата за бакеты Object Storage: хранение данных и выполнение операций с ними (см. тарифы Object Storage).
- Плата за получение и хранение логов (см. тарифы Cloud Logging).
- Плата за вычислительные ресурсы компонентов кластера Yandex Managed Service for Apache Spark™ (см. тарифы Yandex Managed Service for Apache Spark™).
- Плата за вычислительные ресурсы компонентов кластера Apache Hive™ Metastore (см. тарифы Yandex MetaData Hub).
Подготовьте инфраструктуру
Подготовьте инфраструктуру:
-
Создайте сервисный аккаунт
spark-agentи выдайте ему роль managed-spark.integrationProvider. -
Создайте сервисный аккаунт
metastore-agentи выдайте ему роль managed-metastore.integrationProvider, чтобы кластер Apache Hive™ Metastore мог взаимодействовать с другими ресурсами. -
- бакет для исходного кода PySpark-задания;
- бакет для выходных данных.
-
Предоставьте разрешения для сервисного аккаунта
spark-agentна созданные бакеты:- бакет для исходного кода PySpark-задания —
READ; - бакет для выходных данных —
READ и WRITE.
- бакет для исходного кода PySpark-задания —
-
Предоставьте разрешение
READ и WRITEдля сервисного аккаунтаmetastore-agentна бакет для выходных данных. -
Создайте облачную сеть с именем
integration-network.Вместе с ней автоматически создадутся три подсети в разных зонах доступности.
-
Для кластера Apache Spark™ создайте группу безопасности
spark-sgв сетиintegration-network. Добавьте в группу следующее правило:-
Для исходящего трафика, чтобы разрешить подключение кластера Apache Spark™ к Apache Hive™ Metastore:
- Диапазон портов —
9083. - Протокол —
Любой(Any). - Назначение —
CIDR. - CIDR блоки —
0.0.0.0/0.
- Диапазон портов —
-
-
Для кластера Apache Hive™ Metastore создайте группу безопасности
metastore-sgв сетиintegration-network. Добавьте в группу следующие правила:-
Для входящего трафика от клиентов:
- Диапазон портов —
30000-32767. - Протокол —
Любой(Any). - Источник —
CIDR. - CIDR блоки —
0.0.0.0/0.
- Диапазон портов —
-
Для входящего трафика от балансировщика:
- Диапазон портов —
10256. - Протокол —
Любой(Any). - Источник —
Проверки состояния балансировщика.
- Диапазон портов —
-
-
Создайте кластер Apache Hive™ Metastore с параметрами:
- Сервисный аккаунт —
metastore-agent. - Версия —
3.1. - Сеть —
integration-network. - Подсеть —
integration-network-ru-central1-a. - Группы безопасности —
metastore-sg.
- Сервисный аккаунт —
-
Создайте кластер Apache Spark™ с параметрами:
- Сервисный аккаунт —
spark-agent. - Сеть —
integration-network. - Подсеть —
integration-network-ru-central1-a. - Группы безопасности —
spark-sg. - Metastore-сервер — созданный ранее кластер Apache Hive™ Metastore.
- Сервисный аккаунт —
Подготовьте и запустите PySpark-задание
Для PySpark-задания будет использован Python-скрипт, который:
- Создает базу данных и таблицу формата Apache Iceberg™ в бакете.
- Записывает 10 строк с данными в таблицу.
- Сохраняет идентификатор текущего снимка таблицы.
- Записывает еще 10 строк с данными в таблицу.
- Выводит количество строк в текущем состоянии таблицы.
- Выводит количество строк в состоянии таблицы на момент снимка.
Подготовьте файл скрипта:
-
Создайте файл
ice_min_demo.pyи скопируйте в него следующий код:ice_min_demo.py
import random from pyspark.sql import SparkSession spark = ( SparkSession.builder .appName("ice_min_demo") .enableHiveSupport() .getOrCreate() ) # Создание базы данных и таблицы в формате Apache Iceberg™. # Apache Hive™ Metastore фиксирует метаданные, что позволяет обращаться к таблице по имени db.tbl # из любых приложений Spark, подключённых к этому кластеру Apache Hive™ Metastore. db, tbl = "demo_db", "demo_events" spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}") spark.sql(f""" CREATE TABLE IF NOT EXISTS {db}.{tbl} ( id BIGINT, value DOUBLE ) USING iceberg """) # Запись первого набора данных в таблицу df1 = spark.createDataFrame([(i, random.random()) for i in range(10)], ["id","value"]) df1.writeTo(f"{db}.{tbl}").append() # Фиксация идентификатора текущего снимка из служебной таблицы .snapshots snap_before = spark.sql(f"SELECT max(snapshot_id) AS s FROM {db}.{tbl}.snapshots").collect()[0][0] # Запись второго набора данных в таблицу df2 = spark.createDataFrame([(i, random.random()) for i in range(10, 20)], ["id","value"]) df2.writeTo(f"{db}.{tbl}").append() # Подсчет и вывод числа строк в текущем (20) и предыдущем состоянии таблицы (10) cnt_now = spark.table(f"{db}.{tbl}").count() cnt_past = spark.sql(f"SELECT COUNT(*) FROM {db}.{tbl} VERSION AS OF {snap_before}").collect()[0][0] print(f"now_count: {cnt_now} | past_count: {cnt_past}", flush=True) spark.stop() -
В бакете для исходного кода создайте папку
scriptsи загрузите в нее файлice_min_demo.py. -
Создайте задание с параметрами:
- Тип задания: PySpark.
- Main python файл:
s3a://<бакет_для_исходного_кода>/scripts/ice_min_demo.py. - Настройки:
spark.sql.warehouse.dir–s3a://<бакет_для_выходных_данных>/warehouse/.
Проверьте результат
- Дождитесь, когда созданное PySpark-задание перейдет в статус Done.
- Откройте логи выполнения задания.
- Найдите в логах строку
now_count: 20 | past_count: 10. - Убедитесь, что в бакете для выходных данных появилась папка
warehouse/demo_db. Теперь данные из созданной БДdemo_dbхранятся в бакете Object Storage, а метаинформация о ней — в кластере Apache Hive™ Metastore.
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:
- Бакеты Object Storage. Перед удалением бакетов удалите из них все объекты.
- Кластер Apache Hive™ Metastore.
- Кластер Apache Spark™.