Integration with Yandex Managed Service for Apache Airflow™
Yandex Managed Service for Apache Airflow™ allows you to describe and manage automated data processing workflows. A workflow is a Directed Acyclic Graph (DAG) implemented using a Python script. A file with this script is called a DAG file. It contains jobs, their run schedule, and dependencies between them.
You can use Managed Service for Apache Airflow™ for DataSphere Jobs to be rerun on a schedule.
Running jobs regularly
To run jobs regularly, specify Pip packages — DataSphere Jobs: datasphere in the Dependencies section when creating a Apache Airflow™ cluster. For your Apache Airflow™ cluster to work with DataSphere jobs, add the cluster's service account to the DataSphere project with the Developer
role or higher.
To manage a DAG file through the Apache Airflow™ web service interface, save it to the Yandex Object Storage bucket.
Warning
When writing job code, specify the yandexcloud
package dependency of the latest published version to avoid dependency conflicts.
Example
Contents of a DAG file configured to run daily using the schedule
parameter:
from typing import Dict
from airflow.decorators import dag, task
import pendulum
from datasphere import SDK
now = pendulum.now()
@dag(dag_id='fork_job_sync', start_date=now, schedule="@daily", catchup=False)
def run():
@task(task_id='fork_job')
def fork_job(source_job_id: str, args: Dict[str, str]):
sdk = SDK()
job = sdk.fork_job(source_job_id, args=args)
job.wait()
fork_job('<source_job_id>', {'RANGE': '1'})
run()
Job execution logs are displayed in DataSphere CLI and the web server interface. If there are any issues, the DAG operation will fail and the corresponding stack trace will appear in the logs.
Note
Currently, when working with DataSphere Jobs via Airflow DAG, you cannot redefine the input and output data files. Input data files will be taken from the parent job. Save output data externally, e.g., to an S3 storage.
Non-blocking job fork calls via DAG
A non-blocking call allows you to perform new calculations on your Apache Airflow™ cluster immediately after running a job without waiting for it to be completed. Job status checks are performed using Airflow Sensor
Example
Let's take a look at a DAG file with Airflow Sensor which inquires for job status every minute using the get_job
function. When the job is completed, the last operator, handle_job_result
, is run:
from typing import Optional
from airflow.decorators import dag, task
from airflow.sensors.base import PokeReturnValue
import pendulum
from datasphere import SDK
now = pendulum.now()
@dag(dag_id='fork_job_async', start_date=now, schedule="@daily", catchup=False)
def run():
@task(task_id='fork_job')
def fork_job(source_job_id: str, args: Dict[str, str]) -> str:
sdk = SDK()
job = sdk.fork_job(source_job_id, args=args)
return job.id
@task(task_id='print_job_info')
def print_job_info(job_id: str):
print(f'Do something useful, may be with job {job_id}')
@task.sensor(task_id='wait_for_job', poke_interval=60, timeout=3600, mode='reschedule')
def wait_for_job(job_id: str) -> PokeReturnValue:
sdk = SDK()
job = sdk.get_job(job_id)
return PokeReturnValue(is_done=job.done, xcom_value='xcom_value')
@task(task_id='handle_job_result')
def handle_job_result():
print('Processing job results')
job_id = fork_job('<parent_job_ID>', {'RANGE': '1'})
print_job_info(job_id)
wait_for_job(job_id) >> handle_job_result()
run()
Add additional operators before or after the job run by integrating it with other systems or external storages.
Using the results of completed jobs
You can use a DAG operator to load the output file of one job (result.txt
) and provide it as an input file (INPUT_DATA
) for another:
from typing import Dict
from airflow.decorators import dag, task
import pendulum
from datasphere import SDK
now = pendulum.now()
@dag(dag_id='output_files_for_other_job', start_date=now, schedule="@daily", catchup=False)
def run():
@task(task_id='fork_job')
def fork_job(files_job_id: str, fork_source_job_id: str):
sdk = SDK()
sdk.download_job_files(files_job_id)
job = sdk.fork_job(fork_source_job_id, vars={'INPUT_DATA': 'result.txt'})
job.wait()
fork_job('<ID_of_job_to_download>', '<ID_of_job_to_run>')