Запуск PySpark-задания с помощью Yandex Managed Service for Apache Airflow™
С помощью кластера Yandex Managed Service for Apache Airflow™ можно автоматизировать работу с сервисом Yandex Managed Service for Apache Spark™, включая следующие операции:
- создание кластеров Apache Spark™,
- ожидание запуска кластеров,
- запуск заданий Apache Spark™,
- проверку результатов,
- закрытие ресурсов и удаление кластеров.
Для этого необходимо создать DAG — направленный ациклический граф задач (DAG). Используя DAG, кластер Apache Airflow™ автоматически выполнит все необходимые действия по работе с Apache Spark™.
Работа с Apache Spark™ через Apache Airflow™ позволяет:
- Выполнять задания по расписанию для создания отчетов и снимков данных, обслуживания, обновления метрик и т. п.
- Строить пайплайны, включающие анализ данных, эксперименты, переобучение моделей и другие задачи.
- Быстро обрабатывать большие объемы данных на лету без необходимости платить за постоянную инфраструктуру с ресурсами большой мощности.
В этом руководстве рассматривается минимально необходимый набор шагов для интеграции Apache Airflow™ и Apache Spark™, при этом S3-хранилище Object Storage и кластер Apache Hive™ Metastore не используются. В такой конфигурации кластер Apache Spark™ может работать только с данными в памяти: создавать временные DataFrame, применять стандартные трансформации и функции, кешировать результаты и использовать временные представления для SQL-запросов.
Примечание
Для работы с постоянными базами и таблицами, а также для долговременного хранения результатов необходимо подключить внешнее хранилище Object Storage и, при необходимости, кластер Apache Hive™ Metastore для управления метаданными.
Интеграция Apache Airflow™ и Apache Spark™ показана на примере DAG, который выполняет следующие действия:
- Создание кластера Apache Spark™.
- Запуск PySpark-задания.
- Удаление кластера Apache Spark™.
Чтобы реализовать описанный пример:
- Подготовьте инфраструктуру.
- Подготовьте PySpark-задание.
- Подготовьте и запустите DAG-файл.
- Проверьте результат.
Если созданные ресурсы вам больше не нужны, удалите их.
Необходимые платные ресурсы
В стоимость поддержки описываемого решения входят:
- Плата за вычислительные ресурсы компонентов кластера Managed Service for Apache Airflow™ (см. тарифы Yandex Managed Service for Apache Airflow™).
- Плата за бакет Object Storage: использование хранилища и выполнение операций с данными (см. тарифы Object Storage).
- Плата за сервис Cloud Logging: объем записываемых данных и время их хранения (см. тарифы Cloud Logging).
- Плата за вычислительные ресурсы компонентов кластера Yandex Managed Service for Apache Spark™ (см. тарифы Yandex Managed Service for Apache Spark™).
Подготовьте инфраструктуру
В этом руководстве используется упрощенная настройка инфраструктуры:
- один сервисный аккаунт с расширенными привилегиями,
- один бакет для хранения всех всех данных,
- группа безопасности по умолчанию.
Такая настройка подходит для тестирования, но не предоставляет достаточный для решения реальных задач уровень безопасности. Чтобы повысить безопасность решения, обеспечьте соблюдение принципа минимальных привилегий.
Подготовьте инфраструктуру:
-
Создайте сервисный аккаунт
integration-agent
со следующими ролями:- managed-airflow.integrationProvider — чтобы кластер Apache Airflow™ мог взаимодействовать с другими ресурсами.
- managed-spark.editor — чтобы управлять кластером Apache Spark™ из DAG.
- iam.serviceAccounts.user — чтобы выбрать сервисный аккаунт
integration-agent
при создании кластера Apache Spark™. - vpc.user — чтобы использовать в кластере Apache Airflow™ подсеть Yandex Virtual Private Cloud.
- logging.editor — чтобы работать с лог-группами.
- logging.reader — чтобы читать логи.
- mdb.viewer — чтобы получать статусы операций.
-
Предоставьте разрешение
READ и WRITE
на бакет для сервисного аккаунтаintegration-agent
. -
Создайте облачную сеть с именем
datalake-network
.Вместе с ней будут автоматически созданы три подсети в разных зонах доступности и группа безопасности.
-
Создайте кластер Managed Service for Apache Airflow™ с параметрами:
- Сервисный аккаунт —
integration-agent
. - Зона доступности —
kz1-a
. - Сеть —
datalake-network
. - Подсеть —
datalake-network-kz1-a
. - Группа безопасности — группа по умолчанию в сети
datalake-network
. - Имя бакета — имя созданного ранее бакета.
- Сервисный аккаунт —
Подготовьте PySpark-задание
Для PySpark-задания будет использован Python-скрипт, который:
- Создает DataFrame с числами от 0 до 9.
- Выводит количество строк в созданном DataFrame.
- Выводит первые пять строк в табличном виде.
Скрипт будет храниться в бакете Object Storage.
Подготовьте файл скрипта:
-
Создайте локально файл с именем
job_minimal.py
и скопируйте в него скрипт:from pyspark.sql import SparkSession spark = (SparkSession.builder.appName("hello_spark").getOrCreate()) df = spark.range(10).toDF("id") print("Row count:", df.count()) df.show(5, truncate=False) spark.stop()
-
Создайте в бакете папку
scripts
и загрузите в нее файлjob_minimal.py
.
Подготовьте и запустите DAG-файл
DAG будет состоять из нескольких вершин, которые формируют цепочку последовательных действий:
- Managed Service for Apache Airflow™ создает временный кластер Apache Spark™ с настройками, заданными в DAG.
- Когда кластер Apache Spark™ готов, запускается задание PySpark.
- После выполнения задания временный кластер Apache Spark™ удаляется.
Чтобы подготовить DAG:
-
Создайте локально файл с именем
dag.py
, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:dag.py
import logging import pendulum from airflow.models.dag import DAG from airflow.decorators import task from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook from airflow.exceptions import AirflowSkipException from yandexcloud.operations import OperationError # Данные вашей инфраструктуры FOLDER_ID = '<идентификатор_каталога>' SERVICE_ACCOUNT_ID = '<идентификатор_сервисного_аккаунта_integration-agent>' SUBNET_IDS = ['<идентификатор_подсети>'] SECURITY_GROUP_IDS = ['<идентификатор_группы_безопасности>'] JOB_NAME = 'job_minimal' JOB_SCRIPT = 's3a://<имя_бакета>/scripts/job_minimal.py' JOB_ARGS = [] JOB_PROPERTIES = { 'spark.executor.instances': '1', } @task # 1 этап: создание кластера Apache Spark™ def create_cluster(yc_hook, cluster_spec): spark_client = yc_hook.sdk.wrappers.Spark() spark_client.create_cluster(cluster_spec) return spark_client.cluster_id @task # 2 этап: запуск задания PySpark def run_spark_job(yc_hook, cluster_id, job_spec): spark_client = yc_hook.sdk.wrappers.Spark() try: job_operation = spark_client.create_pyspark_job(cluster_id=cluster_id, spec=job_spec) job_id = job_operation.response.id job_info = job_operation.response except OperationError as job_error: job_id = job_error.operation_result.meta.job_id job_info, _ = spark_client.get_job(cluster_id=cluster_id, job_id=job_id) raise finally: job_log = spark_client.get_job_log(cluster_id=cluster_id, job_id=job_id) for line in job_log: logging.info(line) logging.info("Job info: %s", job_info) @task(trigger_rule="all_done") # 3 этап: удаление кластера Apache Spark™ def delete_cluster(yc_hook, cluster_id): if cluster_id: spark_client = yc_hook.sdk.wrappers.Spark() spark_client.delete_cluster(cluster_id=cluster_id) else: raise AirflowSkipException("cluster_id is empty; nothing to delete") # Настройки DAG with DAG( dag_id="example_spark", start_date=pendulum.datetime(2025, 1, 1), schedule=None, ): yc_hook = YandexCloudBaseHook() cluster_spec = yc_hook.sdk.wrappers.SparkClusterParameters( folder_id=FOLDER_ID, service_account_id=SERVICE_ACCOUNT_ID, subnet_ids=SUBNET_IDS, security_group_ids=SECURITY_GROUP_IDS, driver_pool_resource_preset="c2-m8", driver_pool_size=1, executor_pool_resource_preset="c4-m16", executor_pool_min_size=1, executor_pool_max_size=2, ) cluster_id = create_cluster(yc_hook, cluster_spec) job_spec = yc_hook.sdk.wrappers.PysparkJobParameters( name=JOB_NAME, main_python_file_uri=JOB_SCRIPT, args=JOB_ARGS, properties=JOB_PROPERTIES, ) task_job = run_spark_job(yc_hook, cluster_id, job_spec) task_delete = delete_cluster(yc_hook, cluster_id) task_job >> task_delete
Где:
FOLDER_ID
— идентификатор каталога, в котором будет создан кластер Apache Spark™.SERVICE_ACCOUNT_ID
— идентификатор сервисного аккаунта, который будет использоваться для создания кластера Apache Spark™.SUBNET_IDS
— идентификатор подсетиdatalake-network-kz1-a
.SECURITY_GROUP_IDS
— идентификатор группы безопасности для кластера Apache Spark™.JOB_NAME
— имя задания PySpark.JOB_SCRIPT
— путь к файлу с заданием PySpark в бакете.JOB_ARGS
— аргументы задания PySpark.JOB_PROPERTIES
— свойства задания PySpark.
-
Загрузите DAG в кластер Apache Airflow™: создайте в бакете папку
dags
и загрузите в нее файлdag.py
. -
Откройте веб-интерфейс Apache Airflow™.
-
Убедитесь, что в разделе DAGs появился новый DAG
example_spark
.Загрузка DAG-файла из бакета может занять несколько минут.
-
Запустите DAG, для этого в строке с его именем нажмите кнопку
.
Проверьте результат
-
В веб-интерфейсе Apache Airflow™ нажмите на название DAG
example_spark
и отслеживайте выполнение задач. -
Дождитесь, когда все три задачи в DAG перейдут в статус Success.
-
Перейдите на вкладку
Graph
. -
В открывшемся графе нажмите на задачу
run_spark_job
и перейдите на вкладкуLogs
. -
Проверьте, что PySpark-задание выполнило корректный вывод в лог. Для этого найдите в логе строки:
Row count: 10
+---+ |id | +---+ |0 | |1 | |2 | |3 | |4 | +---+
Примечание
В консоли управления
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:
- Сервисный аккаунт.
- Бакет Object Storage.
- Кластер Apache Airflow™.
- Группу безопасности, созданную по умолчанию в сети
datalake-network
. - Облачные подсети, созданные по умолчанию в сети
datalake-network
. - Облачную сеть
datalake-network
.