Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Истории успеха
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • ИИ для бизнеса
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Истории успеха
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Практические руководства
    • Все руководства
    • Самостоятельное развертывание веб-интерфейса Apache Kafka®
    • Обновление кластера Managed Service for Apache Kafka® с ZooKeeper на KRaft
    • Миграция БД из стороннего кластера Apache Kafka® в Managed Service for Apache Kafka®
    • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for YDB в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for Greenplum® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Yandex StoreDoc с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for MySQL® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for OpenSearch с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for PostgreSQL с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Data Streams с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for Apache Kafka® с помощью Data Transfer
    • Захват изменений YDB и поставка в YDS
    • Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®
    • Синхронизация топиков Apache Kafka® в Object Storage без использования интернета
    • Отслеживание утери сообщений в топике Apache Kafka®
    • Автоматизация задач Query с помощью Managed Service for Apache Airflow™
    • Отправка запросов к API Yandex Cloud через Yandex Cloud Python SDK
    • Настройка SMTP-сервера для отправки уведомлений по электронной почте
    • Добавление данных в БД ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® средствами ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® при помощи Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for ClickHouse® с помощью Data Transfer
    • Асинхронная репликация данных из PostgreSQL в ClickHouse®
    • Обмен данными между Managed Service for ClickHouse® и Yandex Data Processing
    • Настройка Managed Service for ClickHouse® для Graphite
    • Получение данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®
    • Получение данных из Managed Service for Apache Kafka® в ksqlDB
    • Получение данных из RabbitMQ в Managed Service for ClickHouse®
    • Сохранение потока данных Data Streams в Managed Service for ClickHouse®
    • Асинхронная репликация данных из Яндекс Метрика в ClickHouse® с помощью Data Transfer
    • Использование гибридного хранилища в Managed Service for ClickHouse®
    • Шардирование таблиц Managed Service for ClickHouse®
    • Загрузка данных из Яндекс Директ в витрину Managed Service for ClickHouse® с использованием Cloud Functions, Object Storage и Data Transfer
    • Загрузка данных из Object Storage в Managed Service for ClickHouse® с помощью Data Transfer
    • Миграция данных со сменой хранилища из Managed Service for OpenSearch в Managed Service for ClickHouse® с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Managed Service for ClickHouse® с помощью Data Transfer
    • Интеграция Yandex Managed Service for ClickHouse® с Microsoft SQL Server через ClickHouse® JDBC Bridge
    • Миграция базы данных из Google BigQuery в Managed Service for ClickHouse®
    • Интеграция Yandex Managed Service for ClickHouse® с Oracle через ClickHouse® JDBC Bridge
    • Настройка Cloud DNS для доступа к кластеру Managed Service for ClickHouse® из других облачных сетей
    • Миграция кластера Yandex Data Processing с HDFS в другую зону доступности
    • Импорт данных из Managed Service for MySQL® в Yandex Data Processing с помощью Sqoop
    • Импорт данных из Managed Service for PostgreSQL в Yandex Data Processing с помощью Sqoop
    • Монтирование бакетов Object Storage к файловой системе хостов Yandex Data Processing
    • Работа с топиками Apache Kafka® с помощью Yandex Data Processing
    • Автоматизация работы с Yandex Data Processing с помощью Managed Service for Apache Airflow™
    • Совместная работа с таблицами Yandex Data Processing с использованием Apache Hive™ Metastore
    • Перенос метаданных между кластерами Yandex Data Processing с помощью Apache Hive™ Metastore
    • Импорт данных из Object Storage, обработка и экспорт в Managed Service for ClickHouse®
    • Миграция коллекций из стороннего кластера MongoDB в Yandex StoreDoc
    • Миграция данных в Yandex StoreDoc
    • Миграция кластера Yandex StoreDoc с версии 4.4 на 6.0
    • Шардирование коллекций Yandex StoreDoc
    • Анализ производительности и оптимизация Yandex StoreDoc
    • Миграция БД из стороннего кластера MySQL® в кластер Managed Service for MySQL®
    • Анализ производительности и оптимизация Managed Service for MySQL®
    • Синхронизация данных из стороннего кластера MySQL® в Managed Service for MySQL® с помощью Data Transfer
    • Миграция БД из Managed Service for MySQL® в сторонний кластер MySQL®
    • Миграция БД из Managed Service for MySQL® в Object Storage с помощью Data Transfer
    • Перенос данных из Object Storage в Managed Service for MySQL® с использованием Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Миграция БД из Managed Service for MySQL® в Managed Service for YDB с помощью Data Transfer
    • Захват изменений MySQL® и поставка в YDS
    • Миграция данных из Managed Service for MySQL® в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из AWS RDS for PostgreSQL в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из Managed Service for MySQL® в Managed Service for Greenplum® с помощью Data Transfer
    • Настройка политики индексов в Managed Service for OpenSearch
    • Миграция данных в Managed Service for OpenSearch из стороннего кластера OpenSearch с помощью Data Transfer
    • Загрузка данных из Managed Service for OpenSearch в Object Storage с помощью Data Transfer
    • Миграция данных из Managed Service for OpenSearch в Managed Service for YDB с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Аутентификация в OpenSearch Dashboards кластера Managed Service for OpenSearch с помощью Keycloak
    • Использование плагина yandex-lemmer в Managed Service for OpenSearch
    • Создание кластера PostgreSQL для «1С:Предприятия»
    • Поиск проблем с производительностью кластера Managed Service for PostgreSQL
    • Анализ производительности и оптимизация Managed Service for PostgreSQL
    • Логическая репликация PostgreSQL
    • Миграция БД из стороннего кластера PostgreSQL в Managed Service for PostgreSQL
    • Миграция БД из Managed Service for PostgreSQL
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for YDB с помощью Data Transfer
    • Миграция БД из Managed Service for PostgreSQL в Object Storage
    • Перенос данных из Object Storage в Managed Service for PostgreSQL с использованием Data Transfer
    • Захват изменений PostgreSQL и поставка в YDS
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for MySQL® с помощью Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Решение проблем с сортировкой строк в PostgreSQL после обновления glibc
    • Миграция БД из Greenplum® в ClickHouse®
    • Миграция БД из Greenplum® в PostgreSQL
    • Выгрузка данных Greenplum® в холодное хранилище Object Storage
    • Загрузка данных из Object Storage в Managed Service for Greenplum® с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Создание внешней таблицы на базе таблицы из бакета Object Storage с помощью конфигурационного файла
    • Получение данных из внешних источников с помощью именованных запросов в Greenplum®
    • Миграция БД из стороннего кластера Valkey™ в Yandex Managed Service for Valkey™
    • Использование кластера Yandex Managed Service for Valkey™ в качестве хранилища сессий PHP
    • Загрузка данных из Object Storage в Managed Service for YDB с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Object Storage с помощью Data Transfer
    • Обработка аудитных логов Audit Trails
    • Обработка логов Cloud Logging
    • Обработка потока изменений Debezium
    • Анализ данных с помощью Jupyter
    • Обработка файлов детализации в сервисе Yandex Cloud Billing
    • Ввод данных в системы хранения
    • Умная обработка логов
    • Передача данных в микросервисных архитектурах
    • Миграция данных в Object Storage с помощью Data Transfer
    • Миграция данных из стороннего кластера Greenplum® или PostgreSQL в Managed Service for Greenplum® с помощью Data Transfer
    • Миграция кластера Yandex StoreDoc
    • Миграция кластера MySQL®
    • Миграция на сторонний кластер MySQL®
    • Миграция кластера PostgreSQL
    • Создание реестра схем для поставки данных в формате Debezium CDC из Apache Kafka®
    • Автоматизация работы с помощью Yandex Managed Service for Apache Airflow™
    • Работа с таблицей в Object Storage из PySpark-задания
    • Интеграция Yandex Managed Service for Apache Spark™ и Apache Hive™ Metastore
    • Запуск PySpark-задания с помощью Yandex Managed Service for Apache Airflow™
    • Использование Yandex Object Storage в Yandex Managed Service for Apache Spark™

В этой статье:

  • Необходимые платные ресурсы
  • Подготовьте инфраструктуру
  • Подготовьте PySpark-задание
  • Подготовьте и запустите DAG-файл
  • Проверьте результат
  • Удалите созданные ресурсы
  1. Построение Data Platform
  2. Автоматизация работы с Yandex Data Processing с помощью Managed Service for Apache Airflow™

Автоматизация работы с Yandex Data Processing с помощью Yandex Managed Service for Apache Airflow™

Статья создана
Yandex Cloud
Обновлена 8 октября 2025 г.
  • Необходимые платные ресурсы
  • Подготовьте инфраструктуру
  • Подготовьте PySpark-задание
  • Подготовьте и запустите DAG-файл
  • Проверьте результат
  • Удалите созданные ресурсы

Важно

Руководство тестировалось на кластерах с версией Apache Airflow™ ниже 3.0.

В сервисе Yandex Managed Service for Apache Airflow™ можно создать DAG — направленный ациклический граф задач, который позволит автоматизировать работу с сервисом Yandex Data Processing. Ниже рассматривается DAG, который включает в себя несколько задач:

  1. Создать кластер Yandex Data Processing.
  2. Создать и запустить задание PySpark.
  3. Удалить кластер Yandex Data Processing.

При таком DAG кластер существует непродолжительное время. Так как стоимость ресурсов Yandex Data Processing зависит от времени их использования, в кластере можно задействовать ресурсы повышенной мощности и быстро обработать большее количество данных за те же деньги.

В этом DAG кластер Yandex Data Processing создается без сервиса Hive. Для хранения табличных метаданных в примере ниже используется кластер Apache Hive™ Metastore. Сохраненные метаданные затем может использовать другой кластер Yandex Data Processing.

Чтобы автоматизировать работу с Yandex Data Processing с помощью Managed Service for Apache Airflow™:

  1. Подготовьте инфраструктуру.
  2. Подготовьте PySpark-задание.
  3. Подготовьте и запустите DAG-файл.
  4. Проверьте результат.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for Apache Airflow™: вычислительные ресурсы компонентов кластера (см. тарифы Apache Airflow™).
  • Плата за вычислительные ресурсы кластера Apache Hive™ Metastore (см. тарифы Yandex MetaData Hub).
  • Плата за NAT-шлюз (см. тарифы Virtual Private Cloud).
  • Плата за бакеты Object Storage: хранение данных и выполнение операций с ними (см. тарифы Object Storage).
  • Плата за кластер Yandex Data Processing: использование вычислительных ресурсов ВМ и сетевых дисков Compute Cloud, а также сервиса Cloud Logging для работы с логами (см. тарифы Yandex Data Processing).

Подготовьте инфраструктуруПодготовьте инфраструктуру

В примере ниже рассматривается два сценария. Выберите наиболее подходящий:

  • Высокий уровень безопасности. Такой сценарий рекомендуемый, так как в нем соблюдается принцип минимальных привилегий. Сценарий включает в себя следующие особенности:

    • Права доступа разделяются между сервисными аккаунтами. Для каждого кластера вы создаете отдельный сервисный аккаунт и назначаете ему роли, необходимые только для работы кластера этого аккаунта.
    • Используется несколько бакетов для разных задач, различные данные хранятся в разных бакетах. Например, результаты выполнения PySpark-задания записываются в один бакет, а логи — в другой.
    • Настроены группы безопасности. Вы ограничиваете трафик, и в результате доступ получают только разрешенные ресурсы.
  • Упрощенная настройка. Предусматривает более низкий уровень безопасности:

    • Используется один сервисный аккаунт, который обладает большими привилегиями, чем необходимо.
    • Все данные хранятся только в одном бакете, но в разных папках.
    • Группы безопасности не настраиваются.
Высокий уровень безопасности
Упрощенная настройка

Подготовьте инфраструктуру:

  1. Создайте сервисные аккаунты со следующими ролями:

    Сервисный аккаунт

    Его роли

    airflow-agent для кластера Apache Airflow™.

    • dataproc.editor — чтобы управлять кластером Yandex Data Processing из DAG;
    • vpc.user — чтобы в кластере Apache Airflow™ использовать подсеть Yandex Virtual Private Cloud;
    • managed-airflow.integrationProvider — чтобы кластер Apache Airflow™ мог взаимодействовать с другими ресурсами;
    • iam.serviceAccounts.user — чтобы указать сервисный аккаунт data-processing-agent при создании кластера Yandex Data Processing.

    metastore-agent для кластера Apache Hive™ Metastore.

    • managed-metastore.integrationProvider — чтобы кластер Apache Hive™ Metastore мог взаимодействовать с другими ресурсами.

    data-processing-agent для кластера Yandex Data Processing.

    • dataproc.agent — чтобы сервисный аккаунт мог получать информацию о состоянии хостов кластера, заданиях и лог-группах.
    • dataproc.provisioner — чтобы сервисный аккаунт мог взаимодействовать с автоматически масштабируемой группой ВМ. Тогда будет доступно автомасштабирование подкластеров.
  2. Создайте бакеты:

    • <бакет_для_Managed_Airflow>;
    • <бакет_для_исходного_кода_PySpark_задания>;
    • <бакет_для_выходных_данных_PySpark_задания>;
    • <бакет_для_сбора_логов_Spark>.

    Бакетов нужно несколько, так как на них назначаются различные права доступа.

  3. Предоставьте разрешения на следующие бакеты:

    • <бакет_для_Managed_Airflow> — разрешение READ для сервисного аккаунта airflow-agent;
    • <бакет_для_исходного_кода_PySpark_задания> — разрешение READ для сервисного аккаунта data-processing-agent;
    • <бакет_для_выходных_данных_PySpark_задания> — разрешение READ и WRITE для сервисных аккаунтов data-processing-agent и metastore-agent;
    • <бакет_для_сбора_логов_Spark> — разрешение READ и WRITE для сервисного аккаунта data-processing-agent.
  4. Создайте облачную сеть с именем data-processing-network.

    Вместе с ней автоматически создадутся три подсети в разных зонах доступности.

  5. Настройте NAT-шлюз для подсети data-processing-network-ru-central1-a.

  6. Для кластера Apache Hive™ Metastore создайте группу безопасности metastore-sg в сети data-processing-network. Добавьте в группу следующие правила:

    • Для входящего трафика от клиентов:

      • диапазон портов — 30000-32767;
      • протокол — Любой (Any);
      • источник — CIDR;
      • CIDR блоки — 0.0.0.0/0.
    • Для входящего трафика от балансировщика:

      • диапазон портов — 10256;
      • протокол — Любой (Any);
      • источник — Проверки состояния балансировщика.
  7. Для кластеров Managed Service for Apache Airflow™ и Yandex Data Processing создайте группу безопасности airflow-sg в сети data-processing-network. Добавьте в группу следующие правила:

    • Для входящего служебного трафика:

      • диапазон портов — 0-65535;
      • протокол — Любой (Any);
      • источник — Группа безопасности;
      • группа безопасности — Текущая (Self).
    • Для исходящего служебного трафика:

      • диапазон портов — 0-65535;
      • протокол — Любой (Any);
      • назначение — Группа безопасности;
      • группа безопасности — Текущая (Self).
    • Для исходящего HTTPS-трафика:

      • диапазон портов — 443;
      • протокол — TCP;
      • назначение — CIDR;
      • CIDR блоки — 0.0.0.0/0.
    • Для исходящего трафика, чтобы разрешить подключение кластера Yandex Data Processing к Apache Hive™ Metastore:

      • диапазон портов — 9083;
      • протокол — Любой (Any);
      • назначение — Группа безопасности;
      • группа безопасности — metastore-sg (Из списка).
  8. Создайте кластер Apache Hive™ Metastore с параметрами:

    • Сервисный аккаунт — metastore-agent.
    • Сеть — data-processing-network.
    • Подсеть — data-processing-network-ru-central1-a.
    • Группа безопасности — metastore-sg.
  9. Создайте кластер Managed Service for Apache Airflow™ с параметрами:

    • Сервисный аккаунт — airflow-agent.
    • Зона доступности — ru-central1-a.
    • Сеть — data-processing-network.
    • Подсеть — data-processing-network-ru-central1-a.
    • Группа безопасности — airflow-sg.
    • Имя бакета — <бакет_для_Managed_Airflow>.

Подготовьте инфраструктуру:

  1. Создайте сервисный аккаунт my-editor со следующими ролями:

    • dataproc.editor — для управления кластером Yandex Data Processing из DAG;
    • editor — для остальных необходимых операций.
  2. Создайте бакет <бакет_для_заданий_и_данных>.

    На него не нужно предоставлять разрешение сервисному аккаунту, так как роли editor достаточно.

  3. Создайте облачную сеть с именем data-processing-network.

    Вместе с ней автоматически создадутся три подсети в разных зонах доступности и группа безопасности.

  4. Настройте NAT-шлюз для подсети data-processing-network-ru-central1-a.

  5. Создайте кластер Apache Hive™ Metastore с параметрами:

    • Сервисный аккаунт — my-editor.
    • Сеть — data-processing-network.
    • Подсеть — data-processing-network-ru-central1-a.
    • Группа безопасности — группа по умолчанию в сети data-processing-network.
  6. Создайте кластер Managed Service for Apache Airflow™ с параметрами:

    • Сервисный аккаунт — my-editor.
    • Зона доступности — ru-central1-a.
    • Сеть — data-processing-network.
    • Подсеть — data-processing-network-ru-central1-a.
    • Группа безопасности — группа по умолчанию в сети data-processing-network.
    • Имя бакета — <бакет_для_заданий_и_данных>.

Подготовьте PySpark-заданиеПодготовьте PySpark-задание

Для PySpark-задания будет использован Python-скрипт, который создает таблицу и хранится в бакете Object Storage. Подготовьте файл скрипта:

Высокий уровень безопасности
Упрощенная настройка
  1. Создайте локально файл с именем create-table.py и скопируйте в него скрипт:

    create-table.py
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    
    # Создание Spark-сессии
    spark = SparkSession.builder \
        .appName("create-table") \
        .enableHiveSupport() \
        .getOrCreate()
    
    # Создание схемы данных
    schema = StructType([StructField('Name', StringType(), True),
    StructField('Capital', StringType(), True),
    StructField('Area', IntegerType(), True),
    StructField('Population', IntegerType(), True)])
    
    # Создание датафрейма
    df = spark.createDataFrame([('Австралия', 'Канберра', 7686850, 19731984), ('Австрия', 'Вена', 83855, 7700000)], schema)
    
    # Запись датафрейма в бакет в виде таблицы countries
    df.write.mode("overwrite").option("path","s3a://<бакет_для_выходных_данных_PySpark_задания>/countries").saveAsTable("countries")
    
  2. Создайте в бакете <бакет_для_исходного_кода_PySpark_задания> папку scripts и загрузите в нее файл create-table.py.

  1. Создайте локально файл с именем create-table.py и скопируйте в него скрипт:

    create-table.py
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    
    # Создание Spark-сессии
    spark = SparkSession.builder \
        .appName("create-table") \
        .enableHiveSupport() \
        .getOrCreate()
    
    # Создание схемы данных
    schema = StructType([StructField('Name', StringType(), True),
    StructField('Capital', StringType(), True),
    StructField('Area', IntegerType(), True),
    StructField('Population', IntegerType(), True)])
    
    # Создание датафрейма
    df = spark.createDataFrame([('Австралия', 'Канберра', 7686850, 19731984), ('Австрия', 'Вена', 83855, 7700000)], schema)
    
    # Запись датафрейма в бакет в виде таблицы countries
    df.write.mode("overwrite").option("path","s3a://<бакет_для_заданий_и_данных>/countries").saveAsTable("countries")
    
  2. Создайте в бакете <бакет_для_заданий_и_данных> папку scripts и загрузите в нее файл create-table.py.

Подготовьте и запустите DAG-файлПодготовьте и запустите DAG-файл

DAG будет состоять из нескольких вершин, которые формируют цепочку последовательных действий:

  1. Managed Service for Apache Airflow™ создает временный, легковесный кластер Yandex Data Processing с настройками, заданными в DAG. Этот кластер автоматически подключается к созданному ранее кластеру Apache Hive™ Metastore.
  2. Когда кластер Yandex Data Processing готов, запускается задание PySpark.
  3. После выполнения задания временный кластер Yandex Data Processing удаляется.

Чтобы подготовить DAG:

Высокий уровень безопасности
Упрощенная настройка
  1. Создайте SSH-ключ. Сохраните открытую часть ключа — она понадобится для создания кластера Yandex Data Processing.

  2. Создайте локально файл с именем Data-Processing-DAG.py, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:

    Data-Processing-DAG.py
    import uuid
    import datetime
    from airflow import DAG
    from airflow.utils.trigger_rule import TriggerRule
    from airflow.providers.yandex.operators.yandexcloud_dataproc import (
        DataprocCreateClusterOperator,
        DataprocCreatePysparkJobOperator,
        DataprocDeleteClusterOperator,
    )
    
    # Данные вашей инфраструктуры
    YC_DP_AZ = 'ru-central1-a'
    YC_DP_SSH_PUBLIC_KEY = '<открытая_часть_SSH-ключа>'
    YC_DP_SUBNET_ID = '<идентификатор_подсети>'
    YC_DP_SA_ID = '<идентификатор_сервисного_аккаунта_data-processing-agent>'
    YC_DP_METASTORE_URI = '<IP-адрес>'
    YC_SOURCE_BUCKET = '<бакет_для_исходного_кода_PySpark_задания>'
    YC_DP_LOGS_BUCKET = '<бакет_для_сбора_логов_Spark>'
    
    # Настройки DAG
    with DAG(
            'DATA_INGEST',
            schedule='@hourly',
            tags=['data-processing-and-airflow'],
            start_date=datetime.datetime.now(),
            max_active_runs=1,
            catchup=False
    ) as ingest_dag:
        # 1 этап: создание кластера Yandex Data Proc
        create_spark_cluster = DataprocCreateClusterOperator(
            task_id='dp-cluster-create-task',
            cluster_name=f'tmp-dp-{uuid.uuid4()}',
            cluster_description='Временный кластер для выполнения PySpark-задания под оркестрацией Managed Service for Apache Airflow™',
            ssh_public_keys=YC_DP_SSH_PUBLIC_KEY,
            service_account_id=YC_DP_SA_ID,
            subnet_id=YC_DP_SUBNET_ID,
            s3_bucket=YC_DP_LOGS_BUCKET,
            zone=YC_DP_AZ,
            cluster_image_version='2.1',
            masternode_resource_preset='s2.small',
            masternode_disk_type='network-ssd',
            masternode_disk_size=200,
            computenode_resource_preset='m2.large',
            computenode_disk_type='network-ssd',
            computenode_disk_size=200,
            computenode_count=2,
            computenode_max_hosts_count=5,  # Количество подкластеров для обработки данных будет автоматически масштабироваться в случае большой нагрузки.
            services=['YARN', 'SPARK'],     # Создается легковесный кластер.
            datanode_count=0,               # Без подкластеров для хранения данных.
            properties={                    # С указанием на удаленный кластер Apache Hive™ Metastore.
                'spark:spark.hive.metastore.uris': f'thrift://{YC_DP_METASTORE_URI}:9083',
            },
        )
    
        # 2 этап: запуск задания PySpark
        poke_spark_processing = DataprocCreatePysparkJobOperator(
            task_id='dp-cluster-pyspark-task',
            main_python_file_uri=f's3a://{YC_SOURCE_BUCKET}/scripts/create-table.py',
        )
    
        # 3 этап: удаление кластера Yandex Data Processing
        delete_spark_cluster = DataprocDeleteClusterOperator(
            task_id='dp-cluster-delete-task',
            trigger_rule=TriggerRule.ALL_DONE,
        )
    
        # Формирование DAG из указанных выше этапов
        create_spark_cluster >> poke_spark_processing >> delete_spark_cluster
    

    Где:

    • YC_DP_AZ — зона доступности для кластера Yandex Data Processing;
    • YC_DP_SSH_PUBLIC_KEY — открытая часть SSH-ключа для кластера Yandex Data Processing;
    • YC_DP_SUBNET_ID — идентификатор подсети;
    • YC_DP_SA_ID — идентификатор сервисного аккаунта для Yandex Data Processing;
    • YC_DP_METASTORE_URI — IP-адрес кластера Apache Hive™ Metastore;
    • YC_SOURCE_BUCKET — бакет с Python-скриптом для задания PySpark;
    • YC_DP_LOGS_BUCKET — бакет для логов.
  3. Загрузите DAG в кластер Managed Service for Apache Airflow™: создайте в бакете <бакет_для_Managed_Airflow> папку dags и загрузите в нее файл Data-Processing-DAG.py.

  4. Откройте веб-интерфейс Apache Airflow™.

  5. Убедитесь, что в разделе DAGs появился новый DAG DATA_INGEST с тегом data-processing-and-airflow.

    Загрузка DAG-файла из бакета может занять несколько минут.

  6. Чтобы запустить DAG, в строке с его именем нажмите кнопку image.

  1. Создайте SSH-ключ. Сохраните открытую часть ключа — она понадобится для создания кластера Yandex Data Processing.

  2. Создайте локально файл с именем Data-Processing-DAG.py, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:

    Data-Processing-DAG.py
    import uuid
    import datetime
    from airflow import DAG
    from airflow.utils.trigger_rule import TriggerRule
    from airflow.providers.yandex.operators.yandexcloud_dataproc import (
        DataprocCreateClusterOperator,
        DataprocCreatePysparkJobOperator,
        DataprocDeleteClusterOperator,
    )
    
    # Данные вашей инфраструктуры
    YC_DP_AZ = 'ru-central1-a'
    YC_DP_SSH_PUBLIC_KEY = '<открытая_часть_SSH-ключа>'
    YC_DP_SUBNET_ID = '<идентификатор_подсети>'
    YC_DP_SA_ID = '<идентификатор_сервисного_аккаунта_my-editor>'
    YC_DP_METASTORE_URI = '<IP-адрес>'
    YC_BUCKET = '<бакет_для_заданий_и_данных>'
    
    # Настройки DAG
    with DAG(
            'DATA_INGEST',
            schedule='@hourly',
            tags=['data-processing-and-airflow'],
            start_date=datetime.datetime.now(),
            max_active_runs=1,
            catchup=False
    ) as ingest_dag:
        # 1 этап: создание кластера Yandex Data Proc
        create_spark_cluster = DataprocCreateClusterOperator(
            task_id='dp-cluster-create-task',
            cluster_name=f'tmp-dp-{uuid.uuid4()}',
            cluster_description='Временный кластер для выполнения PySpark-задания под оркестрацией Managed Service for Apache Airflow™',
            ssh_public_keys=YC_DP_SSH_PUBLIC_KEY,
            service_account_id=YC_DP_SA_ID,
            subnet_id=YC_DP_SUBNET_ID,
            s3_bucket=YC_BUCKET,
            zone=YC_DP_AZ,
            cluster_image_version='2.1',
            masternode_resource_preset='s2.small',
            masternode_disk_type='network-ssd',
            masternode_disk_size=200,
            computenode_resource_preset='m2.large',
            computenode_disk_type='network-ssd',
            computenode_disk_size=200,
            computenode_count=2,
            computenode_max_hosts_count=5,  # Количество подкластеров для обработки данных будет автоматически масштабироваться в случае большой нагрузки.
            services=['YARN', 'SPARK'],     # Создается легковесный кластер.
            datanode_count=0,               # Без подкластеров для хранения данных.
            properties={                    # С указанием на удаленный кластер Apache Hive™ Metastore.
                'spark:spark.hive.metastore.uris': f'thrift://{YC_DP_METASTORE_URI}:9083',
            },
        )
    
        # 2 этап: запуск задания PySpark
        poke_spark_processing = DataprocCreatePysparkJobOperator(
            task_id='dp-cluster-pyspark-task',
            main_python_file_uri=f's3a://{YC_BUCKET}/scripts/create-table.py',
        )
    
        # 3 этап: удаление кластера Yandex Data Processing
        delete_spark_cluster = DataprocDeleteClusterOperator(
            task_id='dp-cluster-delete-task',
            trigger_rule=TriggerRule.ALL_DONE,
        )
    
        # Формирование DAG из указанных выше этапов
        create_spark_cluster >> poke_spark_processing >> delete_spark_cluster
    

    Где:

    • YC_DP_AZ — зона доступности для кластера Yandex Data Processing;
    • YC_DP_SSH_PUBLIC_KEY — открытая часть SSH-ключа для кластера Yandex Data Processing;
    • YC_DP_SUBNET_ID — идентификатор подсети;
    • YC_DP_SA_ID — идентификатор сервисного аккаунта my-editor;
    • YC_DP_METASTORE_URI — IP-адрес кластера Apache Hive™ Metastore;
    • YC_BUCKET — <бакет_для_заданий_и_данных>.
  3. Загрузите DAG в кластер Managed Service for Apache Airflow™: создайте в бакете <бакет_для_заданий_и_данных> папку dags и загрузите в нее файл Data-Processing-DAG.py.

  4. Откройте веб-интерфейс Apache Airflow™.

  5. Убедитесь, что в разделе DAGs появился новый DAG DATA_INGEST с тегом data-processing-and-airflow.

    Загрузка DAG-файла из бакета может занять несколько минут.

  6. Чтобы запустить DAG, в строке с его именем нажмите кнопку image.

Проверьте результатПроверьте результат

Высокий уровень безопасности
Упрощенная настройка
  1. Чтобы отслеживать результаты выполнения задач, нажмите на название DAG. Результаты отображаются во вкладке Grid.
  2. Дождитесь, когда все три задачи в DAG перейдут в статус Success. Параллельно вы можете проверить, что в консоли управления создается кластер Yandex Data Processing, выполняется задание PySpark и удаляется тот же кластер.
  3. Убедитесь, что в бакете <бакет_для_выходных_данных_PySpark_задания> появилась папка countries, а в ней — файл part-00000-.... Теперь данные из созданной таблицы хранятся в бакете Object Storage, а метаинформация о ней — в кластере Apache Hive™ Metastore.
  4. Проверьте, что в бакете <бакет_для_сбора_логов_Spark> появились логи выполнения PySpark-задания.
  1. Чтобы отслеживать результаты выполнения задач, нажмите на название DAG. Результаты отображаются во вкладке Grid.
  2. Дождитесь, когда все три задачи в DAG перейдут в статус Success. Параллельно вы можете проверить, что в консоли управления создается кластер Yandex Data Processing, выполняется задание PySpark и удаляется тот же кластер.
  3. Убедитесь, что в бакете <бакет_для_заданий_и_данных> появилась папка countries, а в ней — файл part-00000-.... Теперь данные из созданной таблицы хранятся в бакете Object Storage, а метаинформация о ней — в кластере Apache Hive™ Metastore.
  4. Проверьте, что в бакете <бакет_для_заданий_и_данных> появились логи выполнения PySpark-задания. Они записываются в папки dataproc, user и var.

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:

Высокий уровень безопасности
Упрощенная настройка
  1. Сервисные аккаунты.
  2. Бакеты Object Storage.
  3. Кластер Apache Hive™ Metastore.
  4. Кластер Managed Service for Apache Airflow™.
  5. Таблицу маршрутизации.
  6. NAT-шлюз.
  7. Группы безопасности.
  8. Облачные подсети, созданные по умолчанию в сети data-processing-network.
  9. Облачную сеть.
  1. Сервисный аккаунт.
  2. Бакет Object Storage.
  3. Кластер Apache Hive™ Metastore.
  4. Кластер Managed Service for Apache Airflow™.
  5. Таблицу маршрутизации.
  6. NAT-шлюз.
  7. Группу безопасности, созданную по умолчанию в сети data-processing-network.
  8. Облачные подсети, созданные по умолчанию в сети data-processing-network.
  9. Облачную сеть.

Была ли статья полезна?

Предыдущая
Работа с топиками Apache Kafka® с помощью Yandex Data Processing
Следующая
Совместная работа с таблицами Yandex Data Processing с использованием Apache Hive™ Metastore
Проект Яндекса
© 2025 ООО «Яндекс.Облако»