Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex DataSphere
  • Начало работы
    • О сервисе DataSphere
    • Взаимосвязь ресурсов в DataSphere
    • Сообщества
    • Управление расходами
    • Проект
    • Конфигурации вычислительных ресурсов
      • Задания
      • DataSphere CLI
      • Docker-образы в заданиях
      • Среда исполнения заданий
      • Повторный запуск заданий
      • Интеграция с Managed Service for Apache Airflow™
      • Работа с коннекторами Spark
    • Фундаментальные модели
    • Квоты и лимиты
    • Специальные условия для образования
  • Справочник Terraform
  • Аудитные логи Audit Trails
  • Управление доступом
  • Правила тарификации
  • Публичные материалы
  • История изменений

В этой статье:

  • Регулярный запуск заданий
  • Неблокирующие вызовы клонов задания через DAG
  • Использование результатов работы выполненных заданий
  1. Концепции
  2. DataSphere Jobs
  3. Интеграция с Managed Service for Apache Airflow™

Интеграция с Yandex Managed Service for Apache Airflow™

Статья создана
Yandex Cloud
Обновлена 17 февраля 2025 г.
  • Регулярный запуск заданий
  • Неблокирующие вызовы клонов задания через DAG
  • Использование результатов работы выполненных заданий

Сервис Yandex Managed Service for Apache Airflow™ позволяет описывать и управлять автоматизированными процессами обработки данных — воркфлоу. Воркфлоу представляет собой направленный ациклический граф (Directed Acyclic Graph, DAG), который реализуется с помощью скрипта на Python. Файл с таким скриптом называется DAG-файлом. В нем содержатся задачи, расписание их запуска и зависимости между ними.

Вы можете использовать Managed Service for Apache Airflow™, чтобы организовать повторный запуск заданий DataSphere Jobs по расписанию.

Регулярный запуск заданийРегулярный запуск заданий

Чтобы реализовать регулярный запуск заданий, при создании кластера Apache Airflow™ в разделе Зависимости укажите Pip-пакеты — DataSphere Jobs: datasphere. Чтобы кластер Apache Airflow™ мог работать с заданиями DataSphere, добавьте сервисный аккаунт кластера в проект DataSphere с ролью не ниже Developer.

Чтобы управлять DAG-файлом через интерфейс веб-сервиса Apache Airflow™, сохраните его в бакет Yandex Object Storage.

Важно

При написании кода задания указывайте зависимость от пакета yandexcloud последней опубликованной версии, чтобы избежать конфликтов зависимостей.

ПримерПример

Содержимое DAG-файла, настроенного на ежедневный запуск с помощью параметра schedule:

from typing import Dict

from airflow.decorators import dag, task
import pendulum

from datasphere import SDK

now = pendulum.now()

@dag(dag_id='fork_job_sync', start_date=now, schedule="@daily", catchup=False)
def run():
    @task(task_id='fork_job')
    def fork_job(source_job_id: str, args: Dict[str, str]):
        sdk = SDK()
        job = sdk.fork_job(source_job_id, args=args)
        job.wait()

    fork_job('<source_job_id>', {'RANGE': '1'})

run()

Логи выполнения задания отображаются в DataSphere CLI и в интерфейсе веб-сервера. В случае возникновения проблем, DAG-операция завершится ошибкой, а в логах появится соответствующая трассировка стека.

Примечание

В данный момент при работе с DataSphere Jobs через Airflow DAG нет возможности переопределения файлов с входными и выходными данными. Файлы с входными данными будут взяты у родительского задания. Выходные данные сохраняйте во внешние хранилища, например, в S3.

Неблокирующие вызовы клонов задания через DAGНеблокирующие вызовы клонов задания через DAG

Неблокирующий вызов позволяет сразу после запуска задания, не дожидаясь его завершения, проводить новые вычисления на кластере Apache Airflow™. Для проверки статуса задания используется Airflow Sensor, который экономит вычислительные ресурсы кластера для запуска других графов заданий.

ПримерПример

Рассмотрим DAG-файл с Airflow Sensor, который каждую минуту запрашивает статус задания с помощью функции get_job. Когда задание завершено, запускается последний оператор — handle_job_result:

from typing import Optional

from airflow.decorators import dag, task
from airflow.sensors.base import PokeReturnValue
import pendulum

from datasphere import SDK

now = pendulum.now()

@dag(dag_id='fork_job_async', start_date=now, schedule="@daily", catchup=False)
def run():
    @task(task_id='fork_job')
    def fork_job(source_job_id: str, args: Dict[str, str]) -> str:
        sdk = SDK()
        job = sdk.fork_job(source_job_id, args=args)
        return job.id

    @task(task_id='print_job_info')
    def print_job_info(job_id: str):
        print(f'Do something useful, may be with job {job_id}')

    @task.sensor(task_id='wait_for_job', poke_interval=60, timeout=3600, mode='reschedule')
    def wait_for_job(job_id: str) -> PokeReturnValue:
        sdk = SDK()
        job = sdk.get_job(job_id)
        return PokeReturnValue(is_done=job.done, xcom_value='xcom_value')

    @task(task_id='handle_job_result')
    def handle_job_result():
        print('Processing job results')

    job_id = fork_job('<идентификатор_родительского_задания>', {'RANGE': '1'})
    print_job_info(job_id)
    wait_for_job(job_id) >> handle_job_result()

run()

Добавьте дополнительные операторы до или после запуска задания, интегрировав его с другими системами или внешними хранилищами.

Использование результатов работы выполненных заданийИспользование результатов работы выполненных заданий

С помощью оператора DAG вы можете загрузить выходной файл одного задания (result.txt) и сделать его входным файлом (INPUT_DATA) другого:

Python
from typing import Dict

from airflow.decorators import dag, task
import pendulum

from datasphere import SDK

now = pendulum.now()

@dag(dag_id='output_files_for_other_job', start_date=now, schedule="@daily", catchup=False)
def run():
    @task(task_id='fork_job')
    def fork_job(files_job_id: str, fork_source_job_id: str):
        sdk = SDK()
        sdk.download_job_files(files_job_id)
        job = sdk.fork_job(fork_source_job_id, vars={'INPUT_DATA': 'result.txt'})
        job.wait()

    fork_job('<идентификатор_задания_для_загрузки>', '<идентификатор_запускаемого_задания>')

См. такжеСм. также

  • DataSphere Jobs
  • Повторный запуск заданий DataSphere Jobs
  • Взаимосвязь ресурсов в Managed Service for Apache Airflow™

Была ли статья полезна?

Предыдущая
Повторный запуск заданий
Следующая
Работа с коннекторами Spark
Проект Яндекса
© 2025 ООО «Яндекс.Облако»