Yandex Cloud
Search
Contact UsGet started
  • Blog
  • Pricing
  • Documentation
  • All Services
  • System Status
    • Featured
    • Infrastructure & Network
    • Data Platform
    • Containers
    • Developer tools
    • Serverless
    • Security
    • Monitoring & Resources
    • ML & AI
    • Business tools
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Customer Stories
    • Start testing with double trial credits
    • Cloud credits to scale your IT product
    • Gateway to Russia
    • Cloud for Startups
    • Education and Science
    • Yandex Cloud Partner program
  • Blog
  • Pricing
  • Documentation
© 2025 Direct Cursus Technology L.L.C.
Yandex DataSphere
  • Getting started
    • About Yandex DataSphere
    • DataSphere resource relationships
    • Communities
    • Cost management
    • Project
    • Computing resource configurations
      • Jobs
      • DataSphere CLI
      • Docker images in jobs
      • Job runtime environment
      • Rerunning jobs
      • Integration with Managed Service for Apache Airflow™
      • Working with Spark connectors
    • Foundation models
    • Quotas and limits
    • Special terms for educational institutions
  • Terraform reference
  • Audit Trails events
  • Access management
  • Pricing policy
  • Public materials
  • Release notes

In this article:

  • Running jobs regularly
  • Non-blocking job fork calls via DAG
  • Using the results of completed jobs
  1. Concepts
  2. DataSphere Jobs
  3. Integration with Managed Service for Apache Airflow™

Integration with Yandex Managed Service for Apache Airflow™

Written by
Yandex Cloud
Updated at March 10, 2025
  • Running jobs regularly
  • Non-blocking job fork calls via DAG
  • Using the results of completed jobs

Yandex Managed Service for Apache Airflow™ allows you to describe and manage automated data processing workflows. A workflow is a Directed Acyclic Graph (DAG) implemented using a Python script. A file with this script is called a DAG file. It contains jobs, their run schedule, and dependencies between them.

You can use Managed Service for Apache Airflow™ for DataSphere Jobs to be rerun on a schedule.

Running jobs regularlyRunning jobs regularly

To run jobs regularly, specify Pip packages — DataSphere Jobs: datasphere in the Dependencies section when creating an Apache Airflow™ cluster. For an Apache Airflow™ cluster to work with DataSphere jobs, add the cluster’s service account to the DataSphere project with the Developer role or higher.

To manage a DAG file through the Apache Airflow™ web service interface, save it to the Yandex Object Storage bucket.

Warning

When writing job code, specify the yandexcloud package dependency of the latest published version to avoid dependency conflicts.

ExampleExample

Contents of a DAG file configured to run daily using the schedule parameter:

from typing import Dict

from airflow.decorators import dag, task
import pendulum

from datasphere import SDK

now = pendulum.now()

@dag(dag_id='fork_job_sync', start_date=now, schedule="@daily", catchup=False)
def run():
    @task(task_id='fork_job')
    def fork_job(source_job_id: str, args: Dict[str, str]):
        sdk = SDK()
        job = sdk.fork_job(source_job_id, args=args)
        job.wait()

    fork_job('<source_job_id>', {'RANGE': '1'})

run()

Job run logs are displayed in DataSphere CLI and the web server interface. If there are any issues, the DAG operation will fail and the corresponding stack trace will appear in the logs.

Note

Currently, when working with DataSphere Jobs via Airflow DAG, you cannot redefine the input and output data files. Input data files will be taken from the parent job. Save output data externally, e.g., to an S3 storage.

Non-blocking job fork calls via DAGNon-blocking job fork calls via DAG

A non-blocking call allows you to perform new calculations on your Apache Airflow™ cluster immediately after running a job without waiting for it to be completed. Job status checks are performed using Airflow Sensor, which saves cluster computing resources to run other job graphs.

ExampleExample

Let's take a look at a DAG file with Airflow Sensor which requests the job status every minute using the get_job function. When the job is completed, the last operator, handle_job_result, is run:

from typing import Optional

from airflow.decorators import dag, task
from airflow.sensors.base import PokeReturnValue
import pendulum

from datasphere import SDK

now = pendulum.now()

@dag(dag_id='fork_job_async', start_date=now, schedule="@daily", catchup=False)
def run():
    @task(task_id='fork_job')
    def fork_job(source_job_id: str, args: Dict[str, str]) -> str:
        sdk = SDK()
        job = sdk.fork_job(source_job_id, args=args)
        return job.id

    @task(task_id='print_job_info')
    def print_job_info(job_id: str):
        print(f'Do something useful, may be with job {job_id}')

    @task.sensor(task_id='wait_for_job', poke_interval=60, timeout=3600, mode='reschedule')
    def wait_for_job(job_id: str) -> PokeReturnValue:
        sdk = SDK()
        job = sdk.get_job(job_id)
        return PokeReturnValue(is_done=job.done, xcom_value='xcom_value')

    @task(task_id='handle_job_result')
    def handle_job_result():
        print('Processing job results')

    job_id = fork_job('<parent_job_ID>', {'RANGE': '1'})
    print_job_info(job_id)
    wait_for_job(job_id) >> handle_job_result()

run()

Add additional operators before or after the job run by integrating it with other systems or external storages.

Using the results of completed jobsUsing the results of completed jobs

You can use a DAG operator to load the output file of one job (result.txt) and provide it as an input file (INPUT_DATA) for another:

Python
from typing import Dict

from airflow.decorators import dag, task
import pendulum

from datasphere import SDK

now = pendulum.now()

@dag(dag_id='output_files_for_other_job', start_date=now, schedule="@daily", catchup=False)
def run():
    @task(task_id='fork_job')
    def fork_job(files_job_id: str, fork_source_job_id: str):
        sdk = SDK()
        sdk.download_job_files(files_job_id)
        job = sdk.fork_job(fork_source_job_id, vars={'INPUT_DATA': 'result.txt'})
        job.wait()

    fork_job('<ID_of_job_to_download>', '<ID_of_job_to_run>')

See alsoSee also

  • DataSphere Jobs
  • Rerunning jobs in DataSphere Jobs
  • Resource relationships in Managed Service for Apache Airflow™

Was the article helpful?

Previous
Rerunning jobs
Next
Working with Spark connectors
© 2025 Direct Cursus Technology L.L.C.