Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Истории успеха
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Доступны в регионе
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • ML Services
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Партнёрская программа
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Истории успеха
  • Документация
  • Блог
Проект Яндекса
© 2025 ТОО «Облачные Сервисы Казахстан»
Практические руководства
    • Все руководства
    • Самостоятельное развертывание веб-интерфейса Apache Kafka®
    • Обновление кластера Managed Service for Apache Kafka® с ZooKeeper на KRaft
    • Миграция БД из стороннего кластера Apache Kafka® в Managed Service for Apache Kafka®
    • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for YDB в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for Greenplum® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Yandex StoreDoc с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for MySQL® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for OpenSearch с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for PostgreSQL с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Data Streams с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for Apache Kafka® с помощью Data Transfer
    • Захват изменений YDB и поставка в YDS
    • Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®
    • Синхронизация топиков Apache Kafka® в Object Storage без использования интернета
    • Отслеживание утери сообщений в топике Apache Kafka®
    • Автоматизация задач Query с помощью Managed Service for Apache Airflow™
    • Отправка запросов к API Yandex Cloud через Yandex Cloud Python SDK
    • Настройка SMTP-сервера для отправки уведомлений по электронной почте
    • Добавление данных в БД ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® средствами ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® при помощи Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for ClickHouse® с помощью Data Transfer
    • Асинхронная репликация данных из PostgreSQL в ClickHouse®
    • Обмен данными между Managed Service for ClickHouse® и Yandex Data Processing
    • Настройка Managed Service for ClickHouse® для Graphite
    • Получение данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®
    • Получение данных из Managed Service for Apache Kafka® в ksqlDB
    • Получение данных из RabbitMQ в Managed Service for ClickHouse®
    • Сохранение потока данных Data Streams в Managed Service for ClickHouse®
    • Асинхронная репликация данных из Яндекс Метрика в ClickHouse® с помощью Data Transfer
    • Использование гибридного хранилища в Managed Service for ClickHouse®
    • Шардирование таблиц Managed Service for ClickHouse®
    • Загрузка данных из Яндекс Директ в витрину Managed Service for ClickHouse® с использованием Cloud Functions, Object Storage и Data Transfer
    • Загрузка данных из Object Storage в Managed Service for ClickHouse® с помощью Data Transfer
    • Миграция данных со сменой хранилища из Managed Service for OpenSearch в Managed Service for ClickHouse® с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Managed Service for ClickHouse® с помощью Data Transfer
    • Интеграция Yandex Managed Service for ClickHouse® с Microsoft SQL Server через ClickHouse® JDBC Bridge
    • Миграция базы данных из Google BigQuery в Managed Service for ClickHouse®
    • Интеграция Yandex Managed Service for ClickHouse® с Oracle через ClickHouse® JDBC Bridge
    • Настройка Cloud DNS для доступа к кластеру Managed Service for ClickHouse® из других облачных сетей
    • Миграция кластера Yandex Data Processing с HDFS в другую зону доступности
    • Импорт данных из Managed Service for MySQL® в Yandex Data Processing с помощью Sqoop
    • Импорт данных из Managed Service for PostgreSQL в Yandex Data Processing с помощью Sqoop
    • Монтирование бакетов Object Storage к файловой системе хостов Yandex Data Processing
    • Работа с топиками Apache Kafka® с помощью Yandex Data Processing
    • Автоматизация работы с Yandex Data Processing с помощью Managed Service for Apache Airflow™
    • Совместная работа с таблицами Yandex Data Processing с использованием Apache Hive™ Metastore
    • Перенос метаданных между кластерами Yandex Data Processing с помощью Apache Hive™ Metastore
    • Импорт данных из Object Storage, обработка и экспорт в Managed Service for ClickHouse®
    • Миграция коллекций из стороннего кластера MongoDB в Yandex StoreDoc
    • Миграция данных в Yandex StoreDoc
    • Миграция кластера Yandex StoreDoc с версии 4.4 на 6.0
    • Шардирование коллекций Yandex StoreDoc
    • Анализ производительности и оптимизация Yandex StoreDoc
    • Миграция БД из стороннего кластера MySQL® в кластер Managed Service for MySQL®
    • Анализ производительности и оптимизация Managed Service for MySQL®
    • Синхронизация данных из стороннего кластера MySQL® в Managed Service for MySQL® с помощью Data Transfer
    • Миграция БД из Managed Service for MySQL® в сторонний кластер MySQL®
    • Миграция БД из Managed Service for MySQL® в Object Storage с помощью Data Transfer
    • Перенос данных из Object Storage в Managed Service for MySQL® с использованием Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Миграция БД из Managed Service for MySQL® в Managed Service for YDB с помощью Data Transfer
    • Захват изменений MySQL® и поставка в YDS
    • Миграция данных из Managed Service for MySQL® в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из AWS RDS for PostgreSQL в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из Managed Service for MySQL® в Managed Service for Greenplum® с помощью Data Transfer
    • Настройка политики индексов в Managed Service for OpenSearch
    • Миграция данных в Managed Service for OpenSearch из стороннего кластера OpenSearch с помощью Data Transfer
    • Загрузка данных из Managed Service for OpenSearch в Object Storage с помощью Data Transfer
    • Миграция данных из Managed Service for OpenSearch в Managed Service for YDB с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Аутентификация в OpenSearch Dashboards кластера Managed Service for OpenSearch с помощью Keycloak
    • Использование плагина yandex-lemmer в Managed Service for OpenSearch
    • Создание кластера PostgreSQL для «1С:Предприятия»
    • Логическая репликация PostgreSQL
    • Миграция БД из стороннего кластера PostgreSQL в Managed Service for PostgreSQL
    • Миграция БД из Managed Service for PostgreSQL
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for YDB с помощью Data Transfer
    • Миграция БД из Managed Service for PostgreSQL в Object Storage
    • Перенос данных из Object Storage в Managed Service for PostgreSQL с использованием Data Transfer
    • Захват изменений PostgreSQL и поставка в YDS
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for MySQL® с помощью Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Решение проблем с сортировкой строк в PostgreSQL после обновления glibc
    • Миграция БД из Greenplum® в ClickHouse®
    • Миграция БД из Greenplum® в PostgreSQL
    • Выгрузка данных Greenplum® в холодное хранилище Object Storage
    • Загрузка данных из Object Storage в Managed Service for Greenplum® с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Создание внешней таблицы на базе таблицы из бакета Object Storage с помощью конфигурационного файла
    • Получение данных из внешних источников с помощью именованных запросов в Greenplum®
    • Миграция БД из стороннего кластера Valkey™ в Yandex Managed Service for Valkey™
    • Использование кластера Yandex Managed Service for Valkey™ в качестве хранилища сессий PHP
    • Загрузка данных из Object Storage в Managed Service for YDB с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Object Storage с помощью Data Transfer
    • Обработка аудитных логов Audit Trails
    • Обработка логов Cloud Logging
    • Обработка потока изменений Debezium
    • Анализ данных с помощью Jupyter
    • Обработка файлов детализации в сервисе Yandex Cloud Billing
    • Ввод данных в системы хранения
    • Умная обработка логов
    • Передача данных в микросервисных архитектурах
    • Миграция данных в Object Storage с помощью Data Transfer
    • Миграция данных из стороннего кластера Greenplum® или PostgreSQL в Managed Service for Greenplum® с помощью Data Transfer
    • Миграция кластера Yandex StoreDoc
    • Миграция кластера MySQL®
    • Миграция на сторонний кластер MySQL®
    • Миграция кластера PostgreSQL
    • Создание реестра схем для поставки данных в формате Debezium CDC из Apache Kafka®
    • Автоматизация работы с помощью Yandex Managed Service for Apache Airflow™
    • Работа с таблицей в Object Storage из PySpark-задания
    • Интеграция Yandex Managed Service for Apache Spark™ и Apache Hive™ Metastore
    • Запуск PySpark-задания с помощью Yandex Managed Service for Apache Airflow™
    • Использование Yandex Object Storage в Yandex Managed Service for Apache Spark™

В этой статье:

  • Необходимые платные ресурсы
  • Подготовьте инфраструктуру
  • Подготовьте PySpark-задание
  • Подготовьте и запустите DAG-файл
  • Проверьте результат
  • Удалите созданные ресурсы
  1. Построение Data Platform
  2. Запуск PySpark-задания с помощью Yandex Managed Service for Apache Airflow™

Запуск PySpark-задания с помощью Yandex Managed Service for Apache Airflow™

Статья создана
Yandex Cloud
Обновлена 29 сентября 2025 г.
  • Необходимые платные ресурсы
  • Подготовьте инфраструктуру
  • Подготовьте PySpark-задание
  • Подготовьте и запустите DAG-файл
  • Проверьте результат
  • Удалите созданные ресурсы

С помощью кластера Yandex Managed Service for Apache Airflow™ можно автоматизировать работу с сервисом Yandex Managed Service for Apache Spark™, включая следующие операции:

  • создание кластеров Apache Spark™,
  • ожидание запуска кластеров,
  • запуск заданий Apache Spark™,
  • проверку результатов,
  • закрытие ресурсов и удаление кластеров.

Для этого необходимо создать DAG — направленный ациклический граф задач (DAG). Используя DAG, кластер Apache Airflow™ автоматически выполнит все необходимые действия по работе с Apache Spark™.

Работа с Apache Spark™ через Apache Airflow™ позволяет:

  • Выполнять задания по расписанию для создания отчетов и снимков данных, обслуживания, обновления метрик и т. п.
  • Строить пайплайны, включающие анализ данных, эксперименты, переобучение моделей и другие задачи.
  • Быстро обрабатывать большие объемы данных на лету без необходимости платить за постоянную инфраструктуру с ресурсами большой мощности.

В этом руководстве рассматривается минимально необходимый набор шагов для интеграции Apache Airflow™ и Apache Spark™, при этом S3-хранилище Object Storage и кластер Apache Hive™ Metastore не используются. В такой конфигурации кластер Apache Spark™ может работать только с данными в памяти: создавать временные DataFrame, применять стандартные трансформации и функции, кешировать результаты и использовать временные представления для SQL-запросов.

Примечание

Для работы с постоянными базами и таблицами, а также для долговременного хранения результатов необходимо подключить внешнее хранилище Object Storage и, при необходимости, кластер Apache Hive™ Metastore для управления метаданными.

Интеграция Apache Airflow™ и Apache Spark™ показана на примере DAG, который выполняет следующие действия:

  1. Создание кластера Apache Spark™.
  2. Запуск PySpark-задания.
  3. Удаление кластера Apache Spark™.

Чтобы реализовать описанный пример:

  1. Подготовьте инфраструктуру.
  2. Подготовьте PySpark-задание.
  3. Подготовьте и запустите DAG-файл.
  4. Проверьте результат.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за вычислительные ресурсы компонентов кластера Managed Service for Apache Airflow™ (см. тарифы Yandex Managed Service for Apache Airflow™).
  • Плата за бакет Object Storage: использование хранилища и выполнение операций с данными (см. тарифы Object Storage).
  • Плата за сервис Cloud Logging: объем записываемых данных и время их хранения (см. тарифы Cloud Logging).
  • Плата за вычислительные ресурсы компонентов кластера Yandex Managed Service for Apache Spark™ (см. тарифы Yandex Managed Service for Apache Spark™).

Подготовьте инфраструктуруПодготовьте инфраструктуру

В этом руководстве используется упрощенная настройка инфраструктуры:

  • один сервисный аккаунт с расширенными привилегиями,
  • один бакет для хранения всех всех данных,
  • группа безопасности по умолчанию.

Такая настройка подходит для тестирования, но не предоставляет достаточный для решения реальных задач уровень безопасности. Чтобы повысить безопасность решения, обеспечьте соблюдение принципа минимальных привилегий.

Подготовьте инфраструктуру:

  1. Создайте сервисный аккаунт integration-agent со следующими ролями:

    • managed-airflow.integrationProvider — чтобы кластер Apache Airflow™ мог взаимодействовать с другими ресурсами.
    • managed-spark.editor — чтобы управлять кластером Apache Spark™ из DAG.
    • iam.serviceAccounts.user — чтобы выбрать сервисный аккаунт integration-agent при создании кластера Apache Spark™.
    • vpc.user — чтобы использовать в кластере Apache Airflow™ подсеть Yandex Virtual Private Cloud.
    • logging.editor — чтобы работать с лог-группами.
    • logging.reader — чтобы читать логи.
    • mdb.viewer — чтобы получать статусы операций.
  2. Создайте бакет.

  3. Предоставьте разрешение READ и WRITE на бакет для сервисного аккаунта integration-agent.

  4. Создайте облачную сеть с именем datalake-network.

    Вместе с ней будут автоматически созданы три подсети в разных зонах доступности и группа безопасности.

  5. Создайте кластер Managed Service for Apache Airflow™ с параметрами:

    • Сервисный аккаунт — integration-agent.
    • Зона доступности — kz1-a.
    • Сеть — datalake-network.
    • Подсеть — datalake-network-kz1-a.
    • Группа безопасности — группа по умолчанию в сети datalake-network.
    • Имя бакета — имя созданного ранее бакета.

Подготовьте PySpark-заданиеПодготовьте PySpark-задание

Для PySpark-задания будет использован Python-скрипт, который:

  1. Создает DataFrame с числами от 0 до 9.
  2. Выводит количество строк в созданном DataFrame.
  3. Выводит первые пять строк в табличном виде.

Скрипт будет храниться в бакете Object Storage.

Подготовьте файл скрипта:

  1. Создайте локально файл с именем job_minimal.py и скопируйте в него скрипт:

    from pyspark.sql import SparkSession
    
    spark = (SparkSession.builder.appName("hello_spark").getOrCreate())
    
    df = spark.range(10).toDF("id")
    
    print("Row count:", df.count())
    df.show(5, truncate=False)
    
    spark.stop()
    
  2. Создайте в бакете папку scripts и загрузите в нее файл job_minimal.py.

Подготовьте и запустите DAG-файлПодготовьте и запустите DAG-файл

DAG будет состоять из нескольких вершин, которые формируют цепочку последовательных действий:

  1. Managed Service for Apache Airflow™ создает временный кластер Apache Spark™ с настройками, заданными в DAG.
  2. Когда кластер Apache Spark™ готов, запускается задание PySpark.
  3. После выполнения задания временный кластер Apache Spark™ удаляется.

Чтобы подготовить DAG:

  1. Создайте локально файл с именем dag.py, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:

    dag.py
    import logging
    import pendulum
    from airflow.models.dag import DAG
    from airflow.decorators import task
    from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook
    from airflow.exceptions import AirflowSkipException
    
    from yandexcloud.operations import OperationError
    
    # Данные вашей инфраструктуры
    FOLDER_ID = '<идентификатор_каталога>'
    SERVICE_ACCOUNT_ID = '<идентификатор_сервисного_аккаунта_integration-agent>'
    SUBNET_IDS = ['<идентификатор_подсети>']
    SECURITY_GROUP_IDS = ['<идентификатор_группы_безопасности>']
    
    JOB_NAME = 'job_minimal'
    JOB_SCRIPT = 's3a://<имя_бакета>/scripts/job_minimal.py'
    JOB_ARGS = []
    JOB_PROPERTIES = {
        'spark.executor.instances': '1',
    }
    
    
    @task
    # 1 этап: создание кластера Apache Spark™
    def create_cluster(yc_hook, cluster_spec):
        spark_client = yc_hook.sdk.wrappers.Spark()
        spark_client.create_cluster(cluster_spec)
        return spark_client.cluster_id
    
    
    @task
    # 2 этап: запуск задания PySpark
    def run_spark_job(yc_hook, cluster_id, job_spec):
        spark_client = yc_hook.sdk.wrappers.Spark()
        try:
            job_operation = spark_client.create_pyspark_job(cluster_id=cluster_id, spec=job_spec)
            job_id = job_operation.response.id
            job_info = job_operation.response
        except OperationError as job_error:
            job_id = job_error.operation_result.meta.job_id
            job_info, _ = spark_client.get_job(cluster_id=cluster_id, job_id=job_id)
            raise
        finally:
            job_log = spark_client.get_job_log(cluster_id=cluster_id, job_id=job_id)
            for line in job_log:
                logging.info(line)
            logging.info("Job info: %s", job_info)
    
    
    @task(trigger_rule="all_done")
    # 3 этап: удаление кластера Apache Spark™
    def delete_cluster(yc_hook, cluster_id):
        if cluster_id:
            spark_client = yc_hook.sdk.wrappers.Spark()
            spark_client.delete_cluster(cluster_id=cluster_id)
     else:
         raise AirflowSkipException("cluster_id is empty; nothing to delete")
    
    # Настройки DAG
    with DAG(
        dag_id="example_spark",
        start_date=pendulum.datetime(2025, 1, 1),
        schedule=None,
    ):
        yc_hook = YandexCloudBaseHook()
    
        cluster_spec = yc_hook.sdk.wrappers.SparkClusterParameters(
            folder_id=FOLDER_ID,
            service_account_id=SERVICE_ACCOUNT_ID,
            subnet_ids=SUBNET_IDS,
            security_group_ids=SECURITY_GROUP_IDS,
            driver_pool_resource_preset="c2-m8",
            driver_pool_size=1,
            executor_pool_resource_preset="c4-m16",
            executor_pool_min_size=1,
            executor_pool_max_size=2,
        )
        cluster_id = create_cluster(yc_hook, cluster_spec)
    
        job_spec = yc_hook.sdk.wrappers.PysparkJobParameters(
            name=JOB_NAME,
            main_python_file_uri=JOB_SCRIPT,
            args=JOB_ARGS,
            properties=JOB_PROPERTIES,
        )
        task_job = run_spark_job(yc_hook, cluster_id, job_spec)
        task_delete = delete_cluster(yc_hook, cluster_id)
    
        task_job >> task_delete
    

    Где:

    • FOLDER_ID — идентификатор каталога, в котором будет создан кластер Apache Spark™.
    • SERVICE_ACCOUNT_ID — идентификатор сервисного аккаунта, который будет использоваться для создания кластера Apache Spark™.
    • SUBNET_IDS — идентификатор подсети datalake-network-kz1-a.
    • SECURITY_GROUP_IDS — идентификатор группы безопасности для кластера Apache Spark™.
    • JOB_NAME — имя задания PySpark.
    • JOB_SCRIPT — путь к файлу с заданием PySpark в бакете.
    • JOB_ARGS — аргументы задания PySpark.
    • JOB_PROPERTIES — свойства задания PySpark.
  2. Загрузите DAG в кластер Apache Airflow™: создайте в бакете папку dags и загрузите в нее файл dag.py.

  3. Откройте веб-интерфейс Apache Airflow™.

  4. Убедитесь, что в разделе DAGs появился новый DAG example_spark.

    Загрузка DAG-файла из бакета может занять несколько минут.

  5. Запустите DAG, для этого в строке с его именем нажмите кнопку image.

Проверьте результатПроверьте результат

  1. В веб-интерфейсе Apache Airflow™ нажмите на название DAG example_spark и отслеживайте выполнение задач.

  2. Дождитесь, когда все три задачи в DAG перейдут в статус Success.

  3. Перейдите на вкладку Graph.

  4. В открывшемся графе нажмите на задачу run_spark_job и перейдите на вкладку Logs.

  5. Проверьте, что PySpark-задание выполнило корректный вывод в лог. Для этого найдите в логе строки:

    Row count: 10
    
    +---+
    |id |
    +---+
    |0  |
    |1  |
    |2  |
    |3  |
    |4  |
    +---+
    

Примечание

В консоли управления вы можете следить за созданием кластера Apache Spark™, выполнением задания PySpark и удалением кластера.

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:

  1. Сервисный аккаунт.
  2. Бакет Object Storage.
  3. Кластер Apache Airflow™.
  4. Группу безопасности, созданную по умолчанию в сети datalake-network.
  5. Облачные подсети, созданные по умолчанию в сети datalake-network.
  6. Облачную сеть datalake-network.

Была ли статья полезна?

Предыдущая
Интеграция Yandex Managed Service for Apache Spark™ и Apache Hive™ Metastore
Следующая
Использование Yandex Object Storage в Yandex Managed Service for Apache Spark™
Проект Яндекса
© 2025 ТОО «Облачные Сервисы Казахстан»