Интеграция с Yandex Managed Service for Apache Airflow™
Сервис Yandex Managed Service for Apache Airflow™ позволяет описывать и управлять автоматизированными процессами обработки данных — воркфлоу. Воркфлоу представляет собой направленный ациклический граф (Directed Acyclic Graph, DAG), который реализуется с помощью скрипта на Python. Файл с таким скриптом называется DAG-файлом. В нем содержатся задачи, расписание их запуска и зависимости между ними.
Вы можете использовать Managed Service for Apache Airflow™, чтобы организовать повторный запуск заданий DataSphere Jobs по расписанию.
Регулярный запуск заданий
Чтобы реализовать регулярный запуск заданий, при создании кластера Apache Airflow™ в разделе Зависимости укажите Pip-пакеты — DataSphere Jobs: datasphere. Чтобы кластер Apache Airflow™ мог работать с заданиями DataSphere, добавьте сервисный аккаунт кластера в проект DataSphere с ролью не ниже Developer
.
Чтобы управлять DAG-файлом через интерфейс веб-сервиса Apache Airflow™, сохраните его в бакет Yandex Object Storage.
Важно
При написании кода задания указывайте зависимость от пакета yandexcloud
последней опубликованной версии, чтобы избежать конфликтов зависимостей.
Пример
Содержимое DAG-файла, настроенного на ежедневный запуск с помощью параметра schedule
:
from typing import Dict
from airflow.decorators import dag, task
import pendulum
from datasphere import SDK
now = pendulum.now()
@dag(dag_id='fork_job_sync', start_date=now, schedule="@daily", catchup=False)
def run():
@task(task_id='fork_job')
def fork_job(source_job_id: str, args: Dict[str, str]):
sdk = SDK()
job = sdk.fork_job(source_job_id, args=args)
job.wait()
fork_job('<source_job_id>', {'RANGE': '1'})
run()
Логи выполнения задания отображаются в DataSphere CLI и в интерфейсе веб-сервера. В случае возникновения проблем, DAG-операция завершится ошибкой, а в логах появится соответствующая трассировка стека.
Примечание
В данный момент при работе с DataSphere Jobs через Airflow DAG нет возможности переопределения файлов с входными и выходными данными. Файлы с входными данными будут взяты у родительского задания. Выходные данные сохраняйте во внешние хранилища, например, в S3.
Неблокирующие вызовы клонов задания через DAG
Неблокирующий вызов позволяет сразу после запуска задания, не дожидаясь его завершения, проводить новые вычисления на кластере Apache Airflow™. Для проверки статуса задания используется Airflow Sensor
Пример
Рассмотрим DAG-файл с Airflow Sensor, который каждую минуту запрашивает статус задания с помощью функции get_job
. Когда задание завершено, запускается последний оператор — handle_job_result
:
from typing import Optional
from airflow.decorators import dag, task
from airflow.sensors.base import PokeReturnValue
import pendulum
from datasphere import SDK
now = pendulum.now()
@dag(dag_id='fork_job_async', start_date=now, schedule="@daily", catchup=False)
def run():
@task(task_id='fork_job')
def fork_job(source_job_id: str, args: Dict[str, str]) -> str:
sdk = SDK()
job = sdk.fork_job(source_job_id, args=args)
return job.id
@task(task_id='print_job_info')
def print_job_info(job_id: str):
print(f'Do something useful, may be with job {job_id}')
@task.sensor(task_id='wait_for_job', poke_interval=60, timeout=3600, mode='reschedule')
def wait_for_job(job_id: str) -> PokeReturnValue:
sdk = SDK()
job = sdk.get_job(job_id)
return PokeReturnValue(is_done=job.done, xcom_value='xcom_value')
@task(task_id='handle_job_result')
def handle_job_result():
print('Processing job results')
job_id = fork_job('<идентификатор_родительского_задания>', {'RANGE': '1'})
print_job_info(job_id)
wait_for_job(job_id) >> handle_job_result()
run()
Добавьте дополнительные операторы до или после запуска задания, интегрировав его с другими системами или внешними хранилищами.
Использование результатов работы выполненных заданий
С помощью оператора DAG вы можете загрузить выходной файл одного задания (result.txt
) и сделать его входным файлом (INPUT_DATA
) другого:
from typing import Dict
from airflow.decorators import dag, task
import pendulum
from datasphere import SDK
now = pendulum.now()
@dag(dag_id='output_files_for_other_job', start_date=now, schedule="@daily", catchup=False)
def run():
@task(task_id='fork_job')
def fork_job(files_job_id: str, fork_source_job_id: str):
sdk = SDK()
sdk.download_job_files(files_job_id)
job = sdk.fork_job(fork_source_job_id, vars={'INPUT_DATA': 'result.txt'})
job.wait()
fork_job('<идентификатор_задания_для_загрузки>', '<идентификатор_запускаемого_задания>')