Автоматизация работы с Yandex Managed Service for Apache Spark™
В сервисе Yandex Managed Service for Apache Airflow™ можно создать DAG — направленный ациклический граф задач, который позволит автоматизировать работу с сервисом Yandex Managed Service for Apache Spark™. Ниже рассматривается DAG, который включает в себя несколько задач:
- Создать кластер Apache Spark™.
- Запустить задание PySpark.
- Удалить кластер Apache Spark™.
При таком DAG кластер существует непродолжительное время. В кластере можно задействовать ресурсы повышенной мощности и быстро обработать большее количество данных.
В этом DAG создается кластер Apache Spark™. Для хранения табличных метаданных в примере ниже используется кластер Hive Metastore. Сохраненные метаданные затем может использовать другой кластер Apache Spark™.
Для использования Yandex Managed Service for Apache Spark™ в сервисе Yandex Managed Service for Apache Airflow™:
- Подготовьте инфраструктуру.
- Подготовьте PySpark-задание.
- Подготовьте и запустите DAG-файл.
- Проверьте результат.
Если созданные ресурсы вам больше не нужны, удалите их.
Необходимые платные ресурсы
В стоимость поддержки описываемого решения входят:
- Плата за кластер Apache Airflow™: вычислительные ресурсы компонентов кластера (см. тарифы Yandex Managed Service for Apache Airflow™).
- Плата за бакеты Object Storage: хранение данных и выполнение операций с ними (см. тарифы Object Storage).
- Плата за получение и хранение логов (см. тарифы Cloud Logging).
Подготовьте инфраструктуру
В примере ниже рассматривается два сценария. Выберите наиболее подходящий:
-
Высокий уровень безопасности. Такой сценарий рекомендуемый, так как в нем соблюдается принцип минимальных привилегий. Сценарий включает в себя следующие особенности:
- Права доступа разделяются между сервисными аккаунтами. Для каждого кластера вы создаете отдельный сервисный аккаунт и назначаете ему роли, необходимые только для работы кластера этого аккаунта.
- Используется несколько бакетов для разных задач, различные данные хранятся в разных бакетах. Например, DAG загружается в один бакет, а результаты выполнения PySpark-задания записываются в другой бакет.
- Настроены группы безопасности. Вы ограничиваете трафик, и в результате доступ получают только разрешенные ресурсы.
-
Упрощенная настройка. Предусматривает более низкий уровень безопасности:
- Используется один сервисный аккаунт, который обладает большими привилегиями, чем необходимо.
- Все данные хранятся только в одном бакете, но в разных папках.
- Группы безопасности не настраиваются.
Подготовьте инфраструктуру:
-
Создайте сервисные аккаунты со следующими ролями:
Сервисный аккаунт
Его роли
airflow-agent
для кластера Apache Airflow™.- managed-airflow.integrationProvider — чтобы кластер Apache Airflow™ мог взаимодействовать с другими ресурсами.
- managed-spark.editor — чтобы управлять кластером Apache Spark™ из DAG.
- iam.serviceAccounts.user — чтобы указать сервисный аккаунт
spark-agent
при создании кластера Apache Spark™. - vpc.user — чтобы в кластере Apache Airflow™ использовать подсеть Yandex Virtual Private Cloud.
- logging.editor — чтобы работать с лог-группами.
- logging.reader — чтобы читать логи.
- mdb.viewer — чтобы получать статусы операций.
- managed-metastore.viewer — чтобы просматривать информацию о кластерах Hive Metastore.
metastore-agent
для кластера Metastore.- managed-metastore.integrationProvider — чтобы кластер Metastore мог взаимодействовать с другими ресурсами.
spark-agent
для кластера Apache Spark™.- managed-spark.integrationProvider — чтобы кластер Apache Spark™ мог взаимодействовать с другими ресурсами.
-
<бакет_для_исходного_кода_Airflow_DAG>
.<бакет_для_исходного_кода_PySpark_задания>
.<бакет_для_выходных_данных_PySpark_задания>
.
Бакетов нужно несколько, так как на них назначаются различные права доступа.
-
Предоставьте разрешения на следующие бакеты:
<бакет_для_исходного_кода_Airflow_DAG>
— разрешениеREAD
для сервисного аккаунтаairflow-agent
.<бакет_для_исходного_кода_PySpark_задания>
— разрешениеREAD
для сервисного аккаунтаspark-agent
.<бакет_для_выходных_данных_PySpark_задания>
— разрешениеREAD и WRITE
для сервисных аккаунтовspark-agent
иmetastore-agent
.
-
Создайте облачную сеть с именем
datalake-network
.Вместе с ней автоматически создадутся три подсети в разных зонах доступности.
-
Для кластера Metastore создайте группу безопасности
metastore-sg
в сетиdatalake-network
. Добавьте в группу следующие правила:-
Для входящего трафика от клиентов:
- Диапазон портов —
30000-32767
. - Протокол —
Любой
(Any
). - Источник —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
Для входящего трафика от балансировщика:
- Диапазон портов —
10256
. - Протокол —
Любой
(Any
). - Источник —
Проверки состояния балансировщика
.
- Диапазон портов —
-
-
Для кластера Apache Airflow™ создайте группу безопасности
airflow-sg
в сетиdatalake-network
. Добавьте в группу следующее правило:-
Для исходящего HTTPS-трафика:
- Диапазон портов —
443
. - Протокол —
TCP
. - Назначение —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
-
Для кластера Apache Spark™ создайте группу безопасности
spark-sg
в сетиdatalake-network
. Добавьте в группу следующее правило:-
Для исходящего трафика, чтобы разрешить подключение кластера Apache Spark™ к Metastore:
- Диапазон портов —
9083
. - Протокол —
Любой
(Any
). - Назначение —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
-
Создайте кластер Metastore с параметрами:
- Сервисный аккаунт —
metastore-agent
. - Сеть —
datalake-network
. - Подсеть —
datalake-network-ru-central1-a
. - Группа безопасности —
metastore-sg
.
Примечание
Дождитесь завершения операции.
- Сервисный аккаунт —
-
Создайте кластер Managed Service for Apache Airflow™ с параметрами:
- Сервисный аккаунт —
airflow-agent
. - Зона доступности —
ru-central1-a
. - Сеть —
datalake-network
. - Подсеть —
datalake-network-ru-central1-a
. - Группа безопасности —
airflow-sg
. - Имя бакета —
<бакет_для_исходного_кода_Airflow_DAG>
.
- Сервисный аккаунт —
Подготовьте инфраструктуру:
-
Создайте сервисный аккаунт
integration-agent
со следующими ролями:- managed-airflow.integrationProvider — чтобы кластер Apache Airflow™ мог взаимодействовать с другими ресурсами.
- managed-spark.editor — чтобы управлять кластером Apache Spark™ из DAG.
- iam.serviceAccounts.user — чтобы указать сервисный аккаунт
spark-agent
при создании кластера Apache Spark™. - vpc.user — чтобы в кластере Apache Airflow™ использовать подсеть Yandex Virtual Private Cloud.
- logging.editor — чтобы работать с лог-группами.
- logging.reader — чтобы читать логи.
- mdb.viewer — чтобы получать статусы операций.
- managed-metastore.viewer — чтобы просматривать информацию о кластерах Hive Metastore.
- managed-metastore.integrationProvider — чтобы кластер Metastore мог взаимодействовать с другими ресурсами.
- managed-spark.integrationProvider — чтобы кластер Apache Spark™ мог взаимодействовать с другими ресурсами.
-
Создайте бакет
<бакет_для_заданий_и_данных>
и предоставьте разрешениеREAD и WRITE
для сервисного аккаунтаintegration-agent
. -
Создайте облачную сеть с именем
datalake-network
.Вместе с ней автоматически создадутся три подсети в разных зонах доступности и группа безопасности.
-
Создайте кластер Metastore с параметрами:
- Сервисный аккаунт —
integration-agent
. - Сеть —
datalake-network
. - Подсеть —
datalake-network-ru-central1-a
. - Группа безопасности — группа по умолчанию в сети
datalake-network
.
Примечание
Дождитесь завершения операции.
- Сервисный аккаунт —
-
Создайте кластер Managed Service for Apache Airflow™ с параметрами:
- Сервисный аккаунт —
integration-agent
. - Зона доступности —
ru-central1-a
. - Сеть —
datalake-network
. - Подсеть —
datalake-network-ru-central1-a
. - Группа безопасности — группа по умолчанию в сети
datalake-network
. - Имя бакета —
<бакет_для_заданий_и_данных>
.
- Сервисный аккаунт —
Подготовьте PySpark-задание
Для PySpark-задания будет использован Python-скрипт, который создает таблицу и хранится в бакете Object Storage. Подготовьте файл скрипта:
-
Создайте локально файл с именем
job_with_table.py
и скопируйте в него скрипт:import random from pyspark.sql import SparkSession def prepare_table(spark, database, table): create_database_sql = "create database if not exists {database}" create_table_sql = """ create table if not exists {database}.{table} ( id int, value double ) using iceberg """ truncate_table_sql = "truncate table {database}.{table}" spark.sql(create_database_sql.format(database=database)) spark.sql(create_table_sql.format(database=database, table=table)) spark.sql(truncate_table_sql.format(database=database, table=table)) def write_data(spark, database, table): data = [(i, random.random()) for i in range(100_000)] # Создание датафрейма df = spark.createDataFrame(data, schema=['id', 'value']) table_full_name = "{database}.{table}".format(database=database, table=table) df.writeTo(table_full_name).append() def main(): # Создание Spark-сессии spark = ( SparkSession .builder .appName('job_with_table') .enableHiveSupport() .getOrCreate() ) database, table = 'database_1', 'table_1' prepare_table(spark, database, table) write_data(spark, database, table) if __name__ == '__main__': main()
-
Создайте в бакете
<бакет_для_исходного_кода_PySpark_задания>
папкуscripts
и загрузите в нее файлjob_with_table.py
.
-
Создайте локально файл с именем
job_with_table.py
и скопируйте в него скрипт:import random from pyspark.sql import SparkSession def prepare_table(spark, database, table): create_database_sql = "create database if not exists {database}" create_table_sql = """ create table if not exists {database}.{table} ( id int, value double ) using iceberg """ truncate_table_sql = "truncate table {database}.{table}" spark.sql(create_database_sql.format(database=database)) spark.sql(create_table_sql.format(database=database, table=table)) spark.sql(truncate_table_sql.format(database=database, table=table)) def write_data(spark, database, table): data = [(i, random.random()) for i in range(100_000)] # Создание датафрейма df = spark.createDataFrame(data, schema=['id', 'value']) table_full_name = "{database}.{table}".format(database=database, table=table) df.writeTo(table_full_name).append() def main(): # Создание Spark-сессии spark = ( SparkSession .builder .appName('job_with_table') .enableHiveSupport() .getOrCreate() ) database, table = 'database_1', 'table_1' prepare_table(spark, database, table) write_data(spark, database, table) if __name__ == '__main__': main()
-
Создайте в бакете
<бакет_для_заданий_и_данных>
папкуscripts
и загрузите в нее файлjob_with_table.py
.
Подготовьте и запустите DAG-файл
DAG будет состоять из нескольких вершин, которые формируют цепочку последовательных действий:
- Yandex Managed Service for Apache Airflow™ создает временный кластер Apache Spark™ с настройками, заданными в DAG. Этот кластер автоматически подключается к созданному ранее кластеру Metastore.
- Когда кластер Apache Spark™ готов, запускается задание PySpark.
- После выполнения задания временный кластер Apache Spark™ удаляется.
Чтобы подготовить DAG:
-
Создайте локально файл с именем
dag.py
, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:import logging import pendulum from airflow.models.dag import DAG from airflow.decorators import task from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook from yandexcloud.operations import OperationError YANDEX_CONN_ID = '<идентификатор_подключения>' # Данные вашей инфраструктуры FOLDER_ID = '<идентификатор_каталога>' SERVICE_ACCOUNT_ID = '<идентификатор_сервисного_аккаунта_spark-agent>' SUBNET_IDS = [<идентификатор_подсети>] SECURITY_GROUP_IDS = [<идентификатор_группы_безопасности>] METASTORE_CLUSTER_ID = '<идентификатор_кластера_Metastore>' JOB_NAME = 'job_with_table' JOB_SCRIPT = 's3a://<бакет_для_исходного_кода_PySpark_задания>/scripts/job_with_table.py' JOB_ARGS = [] JOB_PROPERTIES = { 'spark.executor.instances': '1', 'spark.sql.warehouse.dir': 's3a://<бакет_для_выходных_данных_PySpark_задания>/warehouse', } @task # 1 этап: создание кластера Apache Spark™ def create_cluster(yc_hook, cluster_spec): spark_client = yc_hook.sdk.wrappers.Spark() try: spark_client.create_cluster(cluster_spec) except OperationError as job_error: cluster_id = job_error.operation_result.meta.cluster_id if cluster_id: spark_client.delete_cluster(cluster_id=cluster_id) raise return spark_client.cluster_id @task # 2 этап: запуск задания PySpark def run_spark_job(yc_hook, cluster_id, job_spec): spark_client = yc_hook.sdk.wrappers.Spark() try: job_operation = spark_client.create_pyspark_job(cluster_id=cluster_id, spec=job_spec) job_id = job_operation.response.id job_info = job_operation.response except OperationError as job_error: job_id = job_error.operation_result.meta.job_id job_info, _ = spark_client.get_job(cluster_id=cluster_id, job_id=job_id) raise finally: job_log = spark_client.get_job_log(cluster_id=cluster_id, job_id=job_id) for line in job_log: logging.info(line) logging.info("Job info: %s", job_info) @task(trigger_rule="all_done") # 3 этап: удаление кластера Apache Spark™ def delete_cluster(yc_hook, cluster_id): if cluster_id: spark_client = yc_hook.sdk.wrappers.Spark() spark_client.delete_cluster(cluster_id=cluster_id) # Настройки DAG with DAG( dag_id="example_spark", start_date=pendulum.datetime(2025, 1, 1), schedule=None, ): yc_hook = YandexCloudBaseHook(yandex_conn_id=YANDEX_CONN_ID) cluster_spec = yc_hook.sdk.wrappers.SparkClusterParameters( folder_id=FOLDER_ID, service_account_id=SERVICE_ACCOUNT_ID, subnet_ids=SUBNET_IDS, security_group_ids=SECURITY_GROUP_IDS, driver_pool_resource_preset="c2-m8", driver_pool_size=1, executor_pool_resource_preset="c4-m16", executor_pool_min_size=1, executor_pool_max_size=2, metastore_cluster_id=METASTORE_CLUSTER_ID, ) cluster_id = create_cluster(yc_hook, cluster_spec) job_spec = yc_hook.sdk.wrappers.PysparkJobParameters( name=JOB_NAME, main_python_file_uri=JOB_SCRIPT, args=JOB_ARGS, properties=JOB_PROPERTIES, ) task_job = run_spark_job(yc_hook, cluster_id, job_spec) task_delete = delete_cluster(yc_hook, cluster_id) task_job >> task_delete
Где:
-
YANDEX_CONN_ID
— идентификатор подключения. -
FOLDER_ID
— идентификатор каталога, в котором будет создан кластер Apache Spark™. -
SERVICE_ACCOUNT_ID
— идентификатор сервисного аккаунта, который будет использоваться для создания кластера Apache Spark™. -
SUBNET_IDS
— идентификатор подсети.Примечание
Подсеть для Apache Spark™ и Metastore должна совпадать.
-
SECURITY_GROUP_IDS
— идентификатор группы безопасности для кластера Apache Spark™. -
METASTORE_CLUSTER_ID
— идентификатор кластера Metastore. -
JOB_NAME
— имя задания PySpark. -
JOB_SCRIPT
— путь к файлу с заданием PySpark. -
JOB_ARGS
— аргументы задания PySpark. -
JOB_PROPERTIES
— свойства задания PySpark.
-
-
Загрузите DAG в кластер Apache Airflow™: создайте в бакете
<бакет_для_исходного_кода_Airflow_DAG>
папкуdags
и загрузите в нее файлdag.py
. -
Откройте веб-интерфейс Apache Airflow™.
-
Убедитесь, что в разделе DAGs появился новый DAG
example_spark
.Загрузка DAG-файла из бакета может занять несколько минут.
-
Чтобы запустить DAG, в строке с его именем нажмите кнопку
.
-
Создайте локально файл с именем
dag.py
, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:import logging import pendulum from airflow.models.dag import DAG from airflow.decorators import task from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook from yandexcloud.operations import OperationError YANDEX_CONN_ID = '<идентификатор_подключения>' # Данные вашей инфраструктуры FOLDER_ID = '<идентификатор_каталога>' SERVICE_ACCOUNT_ID = '<идентификатор_сервисного_аккаунта_integration-agent>' SUBNET_IDS = [<идентификатор_подсети>] SECURITY_GROUP_IDS = [<идентификатор_группы_безопасности>] METASTORE_CLUSTER_ID = '<идентификатор_кластера_Metastore>' JOB_NAME = 'job_with_table' JOB_SCRIPT = 's3a://<бакет_для_заданий_и_данных>/scripts/job_with_table.py' JOB_ARGS = [] JOB_PROPERTIES = { 'spark.executor.instances': '1', 'spark.sql.warehouse.dir': 's3a://<бакет_для_заданий_и_данных>/warehouse', } @task # 1 этап: создание кластера Apache Spark™ def create_cluster(yc_hook, cluster_spec): spark_client = yc_hook.sdk.wrappers.Spark() try: spark_client.create_cluster(cluster_spec) except OperationError as job_error: cluster_id = job_error.operation_result.meta.cluster_id if cluster_id: spark_client.delete_cluster(cluster_id=cluster_id) raise return spark_client.cluster_id @task # 2 этап: запуск задания PySpark def run_spark_job(yc_hook, cluster_id, job_spec): spark_client = yc_hook.sdk.wrappers.Spark() try: job_operation = spark_client.create_pyspark_job(cluster_id=cluster_id, spec=job_spec) job_id = job_operation.response.id job_info = job_operation.response except OperationError as job_error: job_id = job_error.operation_result.meta.job_id job_info, _ = spark_client.get_job(cluster_id=cluster_id, job_id=job_id) raise finally: job_log = spark_client.get_job_log(cluster_id=cluster_id, job_id=job_id) for line in job_log: logging.info(line) logging.info("Job info: %s", job_info) @task(trigger_rule="all_done") # 3 этап: удаление кластера Apache Spark™ def delete_cluster(yc_hook, cluster_id): if cluster_id: spark_client = yc_hook.sdk.wrappers.Spark() spark_client.delete_cluster(cluster_id=cluster_id) # Настройки DAG with DAG( dag_id="example_spark", start_date=pendulum.datetime(2025, 1, 1), schedule=None, ): yc_hook = YandexCloudBaseHook(yandex_conn_id=YANDEX_CONN_ID) cluster_spec = yc_hook.sdk.wrappers.SparkClusterParameters( folder_id=FOLDER_ID, service_account_id=SERVICE_ACCOUNT_ID, subnet_ids=SUBNET_IDS, security_group_ids=SECURITY_GROUP_IDS, driver_pool_resource_preset="c2-m8", driver_pool_size=1, executor_pool_resource_preset="c4-m16", executor_pool_min_size=1, executor_pool_max_size=2, metastore_cluster_id=METASTORE_CLUSTER_ID, ) cluster_id = create_cluster(yc_hook, cluster_spec) job_spec = yc_hook.sdk.wrappers.PysparkJobParameters( name=JOB_NAME, main_python_file_uri=JOB_SCRIPT, args=JOB_ARGS, properties=JOB_PROPERTIES, ) task_job = run_spark_job(yc_hook, cluster_id, job_spec) task_delete = delete_cluster(yc_hook, cluster_id) task_job >> task_delete
Где:
-
YANDEX_CONN_ID
— идентификатор подключения. -
FOLDER_ID
— идентификатор каталога, в котором будет создан кластер Apache Spark™. -
SERVICE_ACCOUNT_ID
— идентификатор сервисного аккаунта, который будет использоваться для создания кластера Apache Spark™. -
SUBNET_IDS
— идентификатор подсети.Примечание
Подсеть для Apache Spark™ и Metastore должна совпадать.
-
SECURITY_GROUP_IDS
— идентификатор группы безопасности для кластера Apache Spark™. -
METASTORE_CLUSTER_ID
— идентификатор кластера Metastore. -
JOB_NAME
— имя задания PySpark. -
JOB_SCRIPT
— путь к файлу с заданием PySpark. -
JOB_ARGS
— аргументы задания PySpark. -
JOB_PROPERTIES
— свойства задания PySpark.
-
-
Загрузите DAG в кластер Apache Airflow™: создайте в бакете
<бакет_для_заданий_и_данных>
папкуdags
и загрузите в нее файлdag.py
. -
Откройте веб-интерфейс Apache Airflow™.
-
Убедитесь, что в разделе DAGs появился новый DAG
example_spark
.Загрузка DAG-файла из бакета может занять несколько минут.
-
Чтобы запустить DAG, в строке с его именем нажмите кнопку
.
Проверьте результат
- Чтобы отслеживать результаты выполнения задач, нажмите на название DAG.
- Дождитесь, когда все три задачи в DAG перейдут в статус Success. Параллельно вы можете проверить, что в консоли управления
создается кластер Apache Spark™, выполняется задание PySpark и удаляется тот же кластер. - Убедитесь, что в бакете
<бакет_для_выходных_данных_PySpark_задания>
появилась БДdatabase_1
. Теперь данные из созданной БД хранятся в бакете Object Storage, а метаинформация о ней — в кластере Metastore.
- Чтобы отслеживать результаты выполнения задач, нажмите на название DAG.
- Дождитесь, когда все три задачи в DAG перейдут в статус Success. Параллельно вы можете проверить, что в консоли управления
создается кластер Apache Spark™, выполняется задание PySpark и удаляется тот же кластер. - Убедитесь, что в бакете
<бакет_для_заданий_и_данных>
появилась БДdatabase_1
. Теперь данные из созданной БД хранятся в бакете Object Storage, а метаинформация о ней — в кластере Metastore.
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:
- Сервисные аккаунты.
- Бакеты Object Storage.
- Кластер Metastore.
- Кластер Apache Airflow™.
- Группы безопасности.
- Облачные подсети, созданные по умолчанию в сети
datalake-network
. - Облачную сеть.
- Сервисный аккаунт.
- Бакет Object Storage.
- Кластер Metastore.
- Кластер Apache Airflow™.
- Группу безопасности, созданную по умолчанию в сети
datalake-network
. - Облачные подсети, созданные по умолчанию в сети
datalake-network
. - Облачную сеть.