Интеграция Yandex Managed Service for Apache Spark™ и Apache Hive™ Metastore
К кластеру Apache Spark™ можно подключить кластер Apache Hive™ Metastore. В этом случае метаданные, которые появляются в результате выполнения заданий, загружаются в кластер Apache Hive™ Metastore. Сохраненные метаданные может использовать другой кластер Apache Spark™.
Ниже рассматривается пример, в котором с помощью PySpark-задания создаются база данных и таблица в ней, а затем данные из созданной БД загружаются в бакет Yandex Object Storage. Метаданные о БД сохраняются в кластере Apache Hive™ Metastore, подключенном к кластеру Apache Spark™.
Чтобы реализовать описанный пример:
Если созданные ресурсы вам больше не нужны, удалите их.
Необходимые платные ресурсы
В стоимость поддержки описываемого решения входят:
- Плата за бакеты Object Storage: использование хранилища и выполнение операций с данными (см. тарифы Object Storage).
- Плата за сервис Yandex Cloud Logging: объем записываемых данных и время их хранения (см. тарифы Cloud Logging).
- Плата за вычислительные ресурсы компонентов кластера Managed Service for Apache Spark™ (см. тарифы Managed Service for Apache Spark™).
- Плата за вычислительные ресурсы компонентов кластера Apache Hive™ Metastore (см. тарифы Yandex MetaData Hub).
Подготовьте инфраструктуру
-
Создайте сервисный аккаунт
spark-agentдля кластера Apache Spark™ с ролью managed-spark.integrationProvider — чтобы кластер Apache Spark™ мог взаимодействовать с другими ресурсами. -
Создайте сервисный аккаунт
metastore-agentс ролями managed-metastore.integrationProvider и storage.uploader — чтобы кластер Apache Hive™ Metastore мог взаимодействовать с другими ресурсами и экспортировать метаданные в бакет Object Storage. -
- бакет для исходного кода PySpark-задания;
- бакет для выходных данных.
-
Предоставьте разрешения для сервисного аккаунта
spark-agentна созданные бакеты:- бакет для исходного кода PySpark-задания — разрешение
READ; - бакет для выходных данных — разрешение
READ и WRITE.
- бакет для исходного кода PySpark-задания — разрешение
-
Предоставьте разрешение
READ и WRITEдля сервисного аккаунтаmetastore-agentна бакет для выходных данных. -
Создайте облачную сеть с именем
integration-network.Вместе с ней будут автоматически созданы три подсети в разных зонах доступности.
-
Для кластера Apache Spark™ создайте группу безопасности
spark-sgв сетиintegration-network. Добавьте в группу следующее правило:-
Для исходящего трафика, чтобы разрешить подключение кластера Apache Spark™ к Apache Hive™ Metastore:
- Диапазон портов —
9083. - Протокол —
Любой(Any). - Назначение —
CIDR. - CIDR блоки —
0.0.0.0/0.
- Диапазон портов —
-
-
Для кластера Apache Hive™ Metastore создайте группу безопасности
metastore-sgв сетиintegration-network. Добавьте в группу следующие правила:-
Для входящего трафика от клиентов:
- Диапазон портов —
30000-32767. - Протокол —
Любой(Any). - Источник —
CIDR. - CIDR блоки —
0.0.0.0/0.
- Диапазон портов —
-
Для входящего трафика от балансировщика:
- Диапазон портов —
10256. - Протокол —
Любой(Any). - Источник —
Проверки состояния балансировщика.
- Диапазон портов —
-
-
Создайте кластер Apache Hive™ Metastore с параметрами:
- Сервисный аккаунт —
metastore-agent. - Сеть —
integration-network. - Подсеть —
integration-network-ru-central1-a. - Группы безопасности —
metastore-sg.
- Сервисный аккаунт —
-
Создайте кластер Managed Service for Apache Spark™ с параметрами:
- Сервисный аккаунт —
spark-agent. - Сеть —
integration-network. - Подсеть —
integration-network-ru-central1-a. - Группы безопасности —
spark-sg. - Metastore-сервер — созданный ранее кластер Apache Hive™ Metastore.
- Сервисный аккаунт —
Подготовьте PySpark-задание
Для PySpark-задания будет использован Python-скрипт, который создает БД database_1 и таблицу table_1. Скрипт будет храниться в бакете Object Storage.
Подготовьте файл скрипта:
-
Создайте локально файл с именем
job-create-table.pyи скопируйте в него скрипт:job-create-table.py
import random import sys from pyspark.sql import SparkSession def prepare_table(spark, database, table): create_database_sql = "create database if not exists {database}" create_table_sql = """ create table if not exists {database}.{table} ( id int, value double ) """ truncate_table_sql = "truncate table {database}.{table}" spark.sql(create_database_sql.format(database=database)) spark.sql(create_table_sql.format(database=database, table=table)) spark.sql(truncate_table_sql.format(database=database, table=table)) def write_data(spark, database, table): data = [(i, random.random()) for i in range(100_000)] # Создание датафрейма df = spark.createDataFrame(data, schema=['id', 'value']) table_full_name = "{database}.{table}".format(database=database, table=table) df.write.mode('overwrite').format('json').saveAsTable(table_full_name) def main(): # Создание Spark-сессии spark = ( SparkSession .builder .appName('job-create-table') .enableHiveSupport() .config('spark.sql.warehouse.dir', sys.argv[1]) .getOrCreate() ) database, table = 'database_1', 'table_1' prepare_table(spark, database, table) write_data(spark, database, table) if __name__ == '__main__': if len(sys.argv) != 2: print("Usage: job-create-table s3a://<bucket>/<folder>", file=sys.stderr) sys.exit(-1) main() -
В бакете для исходного кода создайте папку
scriptsи загрузите в нее файлjob-create-table.py. -
В бакете для выходных данных создайте папку
warehouse, в которую будут загружены данные из БДdatabase_1. -
Создайте задание с параметрами:
- Тип задания — PySpark.
- Main python файл —
s3a://<бакет_для_исходного_кода>/scripts/job-create-table.py. - Аргументы —
s3a://<бакет_для_выходных_данных>/warehouse.
Проверьте результат
-
Перейдите на страницу каталога
и выберите сервис Managed Service for Apache Spark. -
Нажмите на имя нужного кластера и выберите вкладку Задания.
-
Дождитесь, когда созданное PySpark-задание перейдет в статус Done.
-
Убедитесь, что в бакете для выходных данных, в папке
warehouse, появился файл с данными из БДdatabase_1. -
Проверьте, что в кластере Apache Hive™ Metastore появились метаданные о БД
database_1:- Экспортируйте метаданные из кластера Apache Hive™ Metastore в бакет для выходных данных.
- Скачайте файл с метаданными и убедитесь, что в нем упоминается БД
database_1.
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:
- Кластер Apache Hive™ Metastore.
- Кластер Apache Spark™.
- Бакеты Object Storage. Перед удалением бакетов удалите из них все объекты.