Yandex Cloud
Search
Contact UsGet started
  • Pricing
  • Customer Stories
  • Documentation
  • Blog
  • All Services
  • System Status
    • Featured
    • Infrastructure & Network
    • Data Platform
    • Containers
    • Developer tools
    • Serverless
    • Security
    • Monitoring & Resources
    • AI for business
    • Business tools
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Start testing with double trial credits
    • Cloud credits to scale your IT product
    • Gateway to Russia
    • Cloud for Startups
    • Center for Technologies and Society
    • Yandex Cloud Partner program
  • Pricing
  • Customer Stories
  • Documentation
  • Blog
© 2025 Direct Cursus Technology L.L.C.
Tutorials
    • All tutorials
    • Unassisted deployment of the Apache Kafka® web interface
    • Upgrading a Managed Service for Apache Kafka® cluster to migrate from ZooKeeper to KRaft
    • Migrating a database from a third-party Apache Kafka® cluster to Managed Service for Apache Kafka®
    • Moving data between Managed Service for Apache Kafka® clusters using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for Apache Kafka® using Debezium
    • Delivering data from Managed Service for PostgreSQL to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for PostgreSQL to Managed Service for Apache Kafka® using Debezium
    • Delivering data from Managed Service for YDB to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for ClickHouse® using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Yandex MPP Analytics for PostgreSQL using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Yandex StoreDoc using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for MySQL® using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for OpenSearch using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for PostgreSQL using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for YDB using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Data Streams using Data Transfer
    • Delivering data from Data Streams to Managed Service for YDB using Data Transfer
    • Delivering data from Data Streams to Managed Service for Apache Kafka® using Data Transfer
    • YDB change data capture and delivery to YDS
    • Configuring Kafka Connect to work with a Managed Service for Apache Kafka® cluster
    • Synchronizing Apache Kafka® topics in Object Storage with no web access
    • Monitoring message loss in a Apache Kafka® topic
    • Automating Query tasks with Managed Service for Apache Airflow™
    • Sending requests to the Yandex Cloud API via the Yandex Cloud Python SDK
    • Configuring an SMTP server to send e-mail notifications
    • Adding data to a ClickHouse® DB
    • Migrating data to Managed Service for ClickHouse® using ClickHouse® tools
    • Migrating data to Managed Service for ClickHouse® using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for ClickHouse® using Data Transfer
    • Asynchronously replicating data from PostgreSQL to ClickHouse®
    • Exchanging data between Managed Service for ClickHouse® and Yandex Data Processing
    • Configuring Managed Service for ClickHouse® for Graphite
    • Fetching data from Managed Service for Apache Kafka® to Managed Service for ClickHouse®
    • Fetching data from Managed Service for Apache Kafka® to ksqlDB
    • Fetching data from RabbitMQ to Managed Service for ClickHouse®
    • Saving a data stream from Data Streams to Managed Service for ClickHouse®
    • Asynchronous replication of data from Yandex Metrica to ClickHouse® using Data Transfer
    • Using hybrid storage in Managed Service for ClickHouse®
    • Sharding Managed Service for ClickHouse® tables
    • Loading data from Yandex Direct to a Managed Service for ClickHouse® data mart using Cloud Functions, Object Storage, and Data Transfer
    • Loading data from Object Storage to Managed Service for ClickHouse® using Data Transfer
    • Migrating data with change of storage from Managed Service for OpenSearch to Managed Service for ClickHouse® using Data Transfer
    • Loading data from Managed Service for YDB to Managed Service for ClickHouse® using Data Transfer
    • Yandex Managed Service for ClickHouse® integration with Microsoft SQL Server via ClickHouse® JDBC Bridge
    • Migrating databases from Google BigQuery to Managed Service for ClickHouse®
    • Yandex Managed Service for ClickHouse® integration with Oracle via ClickHouse® JDBC Bridge
    • Configuring Cloud DNS to access a Managed Service for ClickHouse® cluster from other cloud networks
    • Migrating a Yandex Data Processing HDFS cluster to a different availability zone
    • Importing data from Managed Service for MySQL® to Yandex Data Processing using Sqoop
    • Importing data from Managed Service for PostgreSQL to Yandex Data Processing using Sqoop
    • Mounting Object Storage buckets to the file system of Yandex Data Processing hosts
    • Working with Apache Kafka® topics using Yandex Data Processing
    • Automating operations with Yandex Data Processing using Managed Service for Apache Airflow™
    • Shared use of Yandex Data Processing tables through Apache Hive™ Metastore
    • Transferring metadata across Yandex Data Processing clusters using Apache Hive™ Metastore
    • Importing data from Object Storage, processing it, and exporting it to Managed Service for ClickHouse®
    • Migrating collections from a third-party MongoDB cluster to Yandex StoreDoc
    • Migrating data to Yandex StoreDoc
    • Migrating Yandex StoreDoc cluster from 4.4 to 6.0
    • Sharding Yandex StoreDoc collections
    • Yandex StoreDoc performance analysis and tuning
    • Migrating a database from a third-party MySQL® cluster to a Managed Service for MySQL® cluster
    • Managed Service for MySQL® performance analysis and tuning
    • Syncing data from a third-party MySQL® cluster to Managed Service for MySQL® using Data Transfer
    • Migrating a database from Managed Service for MySQL® to a third-party MySQL® cluster
    • Migrating a database from Managed Service for MySQL® to Object Storage using Data Transfer
    • Migrating data from Object Storage to Managed Service for MySQL® using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for Apache Kafka® using Debezium
    • Migrating a database from Managed Service for MySQL® to Managed Service for YDB using Data Transfer
    • MySQL® change data capture and delivery to YDS
    • Migrating data from Managed Service for MySQL® to Managed Service for PostgreSQL using Data Transfer
    • Migrating data from AWS RDS for PostgreSQL to Managed Service for PostgreSQL using Data Transfer
    • Migrating data from Managed Service for MySQL® to Yandex MPP Analytics for PostgreSQL using Data Transfer
    • Configuring an index policy in Managed Service for OpenSearch
    • Migrating data from a third-party OpenSearch cluster to Managed Service for OpenSearch using Data Transfer
    • Loading data from Managed Service for OpenSearch to Object Storage using Data Transfer
    • Migrating data from Managed Service for OpenSearch to Managed Service for YDB using Data Transfer
    • Copying data from Managed Service for OpenSearch to Yandex MPP Analytics for PostgreSQL using Yandex Data Transfer
    • Migrating data from Managed Service for PostgreSQL to Managed Service for OpenSearch using Data Transfer
    • Authenticating a Managed Service for OpenSearch cluster in OpenSearch Dashboards using Keycloak
    • Using the yandex-lemmer plugin in Managed Service for OpenSearch
    • Creating a PostgreSQL cluster for 1C:Enterprise
    • Searching for the Managed Service for PostgreSQL cluster performance issues
    • Managed Service for PostgreSQL performance analysis and tuning
    • Logical replication in PostgreSQL
    • Migrating a database from a third-party PostgreSQL cluster to Managed Service for PostgreSQL
    • Migrating a database from Managed Service for PostgreSQL
    • Delivering data from Managed Service for PostgreSQL to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for PostgreSQL to Managed Service for Apache Kafka® using Debezium
    • Delivering data from Managed Service for PostgreSQL to Managed Service for YDB using Data Transfer
    • Migrating a database from Managed Service for PostgreSQL to Object Storage
    • Migrating data from Object Storage to Managed Service for PostgreSQL using Data Transfer
    • PostgreSQL change data capture and delivery to YDS
    • Migrating data from Managed Service for PostgreSQL to Managed Service for MySQL® using Data Transfer
    • Migrating data from Managed Service for PostgreSQL to Managed Service for OpenSearch using Data Transfer
    • Fixing string sorting issues in PostgreSQL after upgrading glibc
    • Migrating a database from Greenplum® to ClickHouse®
    • Migrating a database from Greenplum® to PostgreSQL
    • Exporting Greenplum® data to a cold storage in Object Storage
    • Loading data from Object Storage to Yandex MPP Analytics for PostgreSQL using Data Transfer
    • Copying data from Managed Service for OpenSearch to Yandex MPP Analytics for PostgreSQL using Yandex Data Transfer
    • Creating an external table from an Object Storage bucket table using a configuration file
    • Getting data from external sources using named queries in Greenplum®
    • Migrating a database from a third-party Valkey™ cluster to Yandex Managed Service for Valkey™
    • Using a Yandex Managed Service for Valkey™ cluster as a PHP session storage
    • Loading data from Object Storage to Managed Service for YDB using Data Transfer
    • Loading data from Managed Service for YDB to Object Storage using Data Transfer
    • Processing Audit Trails events
    • Processing Cloud Logging logs
    • Processing Debezium CDC streams
    • Analyzing data with Jupyter
    • Processing files with usage details in Yandex Cloud Billing
    • Ingesting data into storage systems
    • Smart log processing
    • Data transfer in microservice architectures
    • Migrating data to Object Storage using Data Transfer
    • Migrating data from a third-party Greenplum® or PostgreSQL cluster to Yandex MPP Analytics for PostgreSQL using Data Transfer
    • Migrating Yandex StoreDoc clusters
    • Migrating MySQL® clusters
    • Migrating to a third-party MySQL® cluster
    • Migrating PostgreSQL clusters
    • Creating a schema registry to deliver data in Debezium CDC format from Apache Kafka®
    • Automating operations using Yandex Managed Service for Apache Airflow™
    • Working with an Object Storage table from a PySpark job
    • Integrating Yandex Managed Service for Apache Spark™ with Apache Hive™ Metastore
    • Running a PySpark job using Yandex Managed Service for Apache Airflow™
    • Using Yandex Object Storage in Yandex Managed Service for Apache Spark™

In this article:

  • Required paid resources
  • Set up your infrastructure
  • Prepare a PySpark job
  • Prepare and run a DAG file
  • Check the result
  • Delete the resources you created
  1. Building a data platform
  2. Automating operations with Yandex Data Processing using Managed Service for Apache Airflow™

Automating operations with Yandex Data Processing using Yandex Managed Service for Apache Airflow™

Written by
Yandex Cloud
Updated at October 23, 2025
  • Required paid resources
  • Set up your infrastructure
  • Prepare a PySpark job
  • Prepare and run a DAG file
  • Check the result
  • Delete the resources you created

Warning

This tutorial was tested on clusters with the Apache Airflow™ version below 3.0.

In Yandex Managed Service for Apache Airflow™, you can create a directed acyclic graph (DAG) to automate your operations in Yandex Data Processing. Below is an example of a DAG that includes a number of tasks:

  1. Create a Yandex Data Processing cluster.
  2. Create and run a PySpark job.
  3. Delete the Yandex Data Processing cluster.

With a DAG like this, a cluster is short-lived. Since the cost of Yandex Data Processing resources depends on their usage time, you can use resources with higher capacity in the cluster and quickly process a larger amount of data at no additional cost.

In this DAG, a Yandex Data Processing cluster is created without using Hive. In the example below, a Apache Hive™ Metastore cluster is used for storing table metadata. The saved metadata can then be used by another Yandex Data Processing cluster.

To automate operations with Yandex Data Processing using Managed Service for Apache Airflow™:

  1. Set up your infrastructure.
  2. Prepare a PySpark job.
  3. Prepare and run a DAG file.
  4. Check the result.

If you no longer need the resources you created, delete them.

Required paid resourcesRequired paid resources

The support cost includes:

  • Managed Service for Apache Airflow™ cluster fee: computing resources of the cluster components (see Apache Airflow™ pricing).
  • Fee for the Apache Hive™ Metastore cluster computing resources (see Yandex MetaData Hub pricing).
  • Fee for a NAT gateway (see Virtual Private Cloud pricing).
  • Object Storage bucket fee: storing data and performing operations with it (see Object Storage pricing).
  • Yandex Data Processing cluster fee: using VM computing resources and Compute Cloud network disks, and Cloud Logging for log management (see Yandex Data Processing pricing).

Set up your infrastructureSet up your infrastructure

The example below illustrates two scenarios. Select the one you find most relevant:

  • High security level. This is a recommended scenario, as it respects the principle of least privilege. This scenario entails the following:

    • Splitting access permissions across different service accounts. You have to create a separate service account for each cluster and assign to it only the roles required for this account's cluster to operate.
    • Using multiple buckets for different tasks and storing different data in separate buckets. For example, the results of running a PySpark job get written to one bucket, and logs, to another.
    • Setting up security groups. This way, you can restrict traffic and grant access only to authorized resources.
  • Simplified setup. This scenario implies a lower security level:

    • Using a single service account with more privileges than required.
    • Storing all data in a single bucket but in different folders.
    • No security groups are set up.
High security level
Simplified setup

Set up the infrastructure:

  1. Create service accounts with the following roles:

    Service account

    Roles

    airflow-agent for an Apache Airflow™ cluster

    • dataproc.editor to manage a Yandex Data Processing cluster from a DAG.
    • vpc.user to use the Yandex Virtual Private Cloud subnet in the Apache Airflow™ cluster.
    • managed-airflow.integrationProvider to enable the Apache Airflow™ cluster to interact with other resources.
    • iam.serviceAccounts.user to specify the data-processing-agent service account when creating a Yandex Data Processing cluster.

    metastore-agent for a Apache Hive™ Metastore cluster

    • managed-metastore.integrationProvider to enable the Apache Hive™ Metastore cluster to interact with other resources.

    data-processing-agent for a Yandex Data Processing cluster

    • dataproc.agent: To enable the service account to get info on cluster host states, jobs, and log groups.
    • dataproc.provisioner: To enable the service account to work with an autoscaling instance group. This will enable subcluster autoscaling.
  2. Create buckets:

    • <bucket_for_Managed_Airflow>
    • <bucket_for_PySpark_job_source_code>
    • <bucket_for_PySpark_job_output_data>
    • <bucket_for_collecting_Spark_logs>

    You need multiple buckets with different access permissions.

  3. Grant permissions for the buckets as follows:

    • <bucket_for_Managed_Airflow>: READ permission to the airflow-agent service account.
    • <bucket_for_PySpark_job_source_code>: READ permission to the data-processing-agent service account.
    • <bucket_for_PySpark_job_output_data>: READ and WRITE permissions to the data-processing-agent and metastore-agent service accounts.
    • <bucket_for_collecting_Spark_logs>: READ and WRITE permissions to the data-processing-agent service account.
  4. Create a cloud network named data-processing-network.

    This will automatically create three subnets in different availability zones.

  5. Set up a NAT gateway for the data-processing-network-ru-central1-a subnet.

  6. For the Apache Hive™ Metastore cluster, create a security group named metastore-sg in data-processing-network. Add the following rules to the group:

    • For incoming client traffic:

      • Port range: 30000-32767
      • Protocol: Any
      • Source: CIDR
      • CIDR blocks: 0.0.0.0/0
    • For incoming load balancer traffic:

      • Port range: 10256
      • Protocol: Any
      • Source: Load balancer health checks
  7. For the Managed Service for Apache Airflow™ and Yandex Data Processing clusters, create a security group named airflow-sg in data-processing-network. Add the following rules to it:

    • For incoming service traffic:

      • Port range: 0-65535
      • Protocol: Any
      • Source: Security group
      • Security group: Self
    • For outgoing service traffic:

      • Port range: 0-65535
      • Protocol: Any
      • Destination: Security group
      • Security group: Self
    • For outgoing HTTPS traffic:

      • Port range: 443
      • Protocol: TCP
      • Destination: CIDR
      • CIDR blocks: 0.0.0.0/0
    • For outgoing traffic, to allow Yandex Data Processing cluster connections to Apache Hive™ Metastore:

      • Port range: 9083
      • Protocol: Any
      • Destination: Security group
      • Security group: metastore-sg (From list)
  8. Create a Apache Hive™ Metastore cluster with the following parameters:

    • Service account: metastore-agent
    • Network: data-processing-network
    • Subnet: data-processing-network-ru-central1-a
    • Security group: metastore-sg
  9. Create a Managed Service for Apache Airflow™ cluster with the following parameters:

    • Service account: airflow-agent
    • Availability zone: ru-central1-a
    • Network: data-processing-network
    • Subnet: data-processing-network-ru-central1-a
    • Security group: airflow-sg
    • Bucket name: <bucket_for_Managed_Airflow>

Set up the infrastructure:

  1. Create a service account named my-editor with the following roles:

    • dataproc.editor to manage a Yandex Data Processing cluster from a DAG.
    • editor to perform other operations.
  2. Create a bucket named <bucket_for_jobs_and_data>.

    You do not need to grant a permission for this bucket to the service account as the editor role is enough.

  3. Create a cloud network named data-processing-network.

    This will automatically create three subnets in different availability zones and a security group.

  4. Set up a NAT gateway for the data-processing-network-ru-central1-a subnet.

  5. Create a Apache Hive™ Metastore cluster with the following parameters:

    • Service account: my-editor
    • Network: data-processing-network
    • Subnet: data-processing-network-ru-central1-a
    • Security group: Default group in data-processing-network
  6. Create a Managed Service for Apache Airflow™ cluster with the following parameters:

    • Service account: my-editor
    • Availability zone: ru-central1-a
    • Network: data-processing-network
    • Subnet: data-processing-network-ru-central1-a
    • Security group: Default group in data-processing-network
    • Bucket name: <bucket_for_jobs_and_data>

Prepare a PySpark jobPrepare a PySpark job

For a PySpark job, we will use a Python script that creates a table and is stored in the Object Storage bucket. Prepare a script file:

High security level
Simplified setup
  1. Create a local file named create-table.py and paste the following script to it:

    create-table.py
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    
    # Creating a Spark session
    spark = SparkSession.builder \
        .appName("create-table") \
        .enableHiveSupport() \
        .getOrCreate()
    
    # Creating a data schema
    schema = StructType([StructField('Name', StringType(), True),
    StructField('Capital', StringType(), True),
    StructField('Area', IntegerType(), True),
    StructField('Population', IntegerType(), True)])
    
    # Creating a dataframe
    df = spark.createDataFrame([('Australia', 'Canberra', 7686850, 19731984), ('Austria', 'Vienna', 83855, 7700000)], schema)
    
    # Writing the dataframe to a bucket as a countries table
    df.write.mode("overwrite").option("path","s3a://<bucket_for_PySpark_job_output_data>/countries").saveAsTable("countries")
    
  2. In <bucket_for_PySpark_job_source_code>, create a folder named scripts and upload the create-table.py file to it.

  1. Create a local file named create-table.py and paste the following script to it:

    create-table.py
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    
    # Creating a Spark session
    spark = SparkSession.builder \
        .appName("create-table") \
        .enableHiveSupport() \
        .getOrCreate()
    
    # Creating a data schema
    schema = StructType([StructField('Name', StringType(), True),
    StructField('Capital', StringType(), True),
    StructField('Area', IntegerType(), True),
    StructField('Population', IntegerType(), True)])
    
    # Creating a dataframe
    df = spark.createDataFrame([('Australia', 'Canberra', 7686850, 19731984), ('Austria', 'Vienna', 83855, 7700000)], schema)
    
    # Writing the dataframe to a bucket as a countries table
    df.write.mode("overwrite").option("path","s3a://<bucket_for_jobs_and_data>/countries").saveAsTable("countries")
    
  2. In <bucket_for_jobs_and_data>, create a folder named scripts and upload the create-table.py file to it.

Prepare and run a DAG filePrepare and run a DAG file

A DAG will have multiple vertices that form a sequence of actions:

  1. Managed Service for Apache Airflow™ creates a temporary lightweight Yandex Data Processing cluster whose settings are set in the DAG. This cluster automatically connects to the previously created Apache Hive™ Metastore cluster.
  2. When the Yandex Data Processing cluster is ready, a PySpark job is run.
  3. Once the job is complete, the temporary Yandex Data Processing cluster is deleted.

To prepare a DAG:

High security level
Simplified setup
  1. Create an SSH key. Save the public part of the key: you will need it to create a Yandex Data Processing cluster.

  2. Create a local file named Data-Processing-DAG.py, paste the following script to it and substitute the variables with your infrastructure data:

    Data-Processing-DAG.py
    import uuid
    import datetime
    from airflow import DAG
    from airflow.utils.trigger_rule import TriggerRule
    from airflow.providers.yandex.operators.yandexcloud_dataproc import (
        DataprocCreateClusterOperator,
        DataprocCreatePysparkJobOperator,
        DataprocDeleteClusterOperator,
    )
    
    # Your infrastructure data
    YC_DP_AZ = 'ru-central1-a'
    YC_DP_SSH_PUBLIC_KEY = '<public_part_of_SSH_key>'
    YC_DP_SUBNET_ID = '<subnet_ID>'
    YC_DP_SA_ID = '<ID_of_data_processing_agent_service_account>'
    YC_DP_METASTORE_URI = '<IP_address>'
    YC_SOURCE_BUCKET = '<bucket_for_PySpark_job_source_code>'
    YC_DP_LOGS_BUCKET = '<bucket_for_collecting_Spark_logs>'
    
    # DAG settings
    with DAG(
            'DATA_INGEST',
            schedule='@hourly',
            tags=['data-processing-and-airflow'],
            start_date=datetime.datetime.now(),
            max_active_runs=1,
            catchup=False
    ) as ingest_dag:
        # Step 1: Creating a Yandex Data Proc cluster
        create_spark_cluster = DataprocCreateClusterOperator(
            task_id='dp-cluster-create-task',
            cluster_name=f'tmp-dp-{uuid.uuid4()}',
            cluster_description='Temporary cluster for running a PySpark job orchestrated by Managed Service for Apache Airflow™',
            ssh_public_keys=YC_DP_SSH_PUBLIC_KEY,
            service_account_id=YC_DP_SA_ID,
            subnet_id=YC_DP_SUBNET_ID,
            s3_bucket=YC_DP_LOGS_BUCKET,
            zone=YC_DP_AZ,
            cluster_image_version='2.1',
            masternode_resource_preset='s2.small',
            masternode_disk_type='network-ssd',
            masternode_disk_size=200,
            computenode_resource_preset='m2.large',
            computenode_disk_type='network-ssd',
            computenode_disk_size=200,
            computenode_count=2,
            computenode_max_hosts_count=5,  # The number of data-handling subclusters will automatically scale up if the load is high.
            services=['YARN', 'SPARK'],     # A lightweight cluster is being created.
            datanode_count=0,               # Without data storage subclusters.
            properties={                    # With a reference to a remote Apache Hive™ Metastore cluster.
                'spark:spark.hive.metastore.uris': f'thrift://{YC_DP_METASTORE_URI}:9083',
            },
        )
    
        # Step 2: Running a PySpark job
        poke_spark_processing = DataprocCreatePysparkJobOperator(
            task_id='dp-cluster-pyspark-task',
            main_python_file_uri=f's3a://{YC_SOURCE_BUCKET}/scripts/create-table.py',
        )
    
        # Step 3: Deleting the Yandex Data Processing cluster
        delete_spark_cluster = DataprocDeleteClusterOperator(
            task_id='dp-cluster-delete-task',
            trigger_rule=TriggerRule.ALL_DONE,
        )
    
        # Building a DAG based on the above steps
        create_spark_cluster >> poke_spark_processing >> delete_spark_cluster
    

    Where:

    • YC_DP_AZ: Availability zone for the Yandex Data Processing cluster.
    • YC_DP_SSH_PUBLIC_KEY: Public part of the SSH key for the Yandex Data Processing cluster.
    • YC_DP_SUBNET_ID: Subnet ID.
    • YC_DP_SA_ID: ID of the service account for Yandex Data Processing.
    • YC_DP_METASTORE_URI: IP address of the Apache Hive™ Metastore cluster.
    • YC_SOURCE_BUCKET: Bucket with the Python script for the PySpark job.
    • YC_DP_LOGS_BUCKET: Bucket for logs.
  3. Upload the DAG to the Managed Service for Apache Airflow™ cluster: in <bucket_for_Managed_Airflow>, create a folder named dags and upload the Data-Processing-DAG.py file to it.

  4. Open the Apache Airflow™ web interface.

  5. Make sure the DAGs section now contains the new DAG named DATA_INGEST and tagged as data-processing-and-airflow.

    It may take a few minutes to upload a DAG file from the bucket.

  6. To run the DAG, click image in the line with its name.

  1. Create an SSH key. Save the public part of the key: you will need it to create a Yandex Data Processing cluster.

  2. Create a local file named Data-Processing-DAG.py, paste the following script to it and substitute the variables with your infrastructure data:

    Data-Processing-DAG.py
    import uuid
    import datetime
    from airflow import DAG
    from airflow.utils.trigger_rule import TriggerRule
    from airflow.providers.yandex.operators.yandexcloud_dataproc import (
        DataprocCreateClusterOperator,
        DataprocCreatePysparkJobOperator,
        DataprocDeleteClusterOperator,
    )
    
    # Your infrastructure data
    YC_DP_AZ = 'ru-central1-a'
    YC_DP_SSH_PUBLIC_KEY = '<public_part_of_SSH_key>'
    YC_DP_SUBNET_ID = '<subnet_ID>'
    YC_DP_SA_ID = '<ID_of_my-editor_service_account>'
    YC_DP_METASTORE_URI = '<IP_address>'
    YC_BUCKET = '<bucket_for_jobs_and_data>'
    
    # DAG settings
    with DAG(
            'DATA_INGEST',
            schedule='@hourly',
            tags=['data-processing-and-airflow'],
            start_date=datetime.datetime.now(),
            max_active_runs=1,
            catchup=False
    ) as ingest_dag:
        # Step 1: Creating a Yandex Data Proc cluster
        create_spark_cluster = DataprocCreateClusterOperator(
            task_id='dp-cluster-create-task',
            cluster_name=f'tmp-dp-{uuid.uuid4()}',
            cluster_description='Temporary cluster for running a PySpark job orchestrated by Managed Service for Apache Airflow™',
            ssh_public_keys=YC_DP_SSH_PUBLIC_KEY,
            service_account_id=YC_DP_SA_ID,
            subnet_id=YC_DP_SUBNET_ID,
            s3_bucket=YC_BUCKET,
            zone=YC_DP_AZ,
            cluster_image_version='2.1',
            masternode_resource_preset='s2.small',
            masternode_disk_type='network-ssd',
            masternode_disk_size=200,
            computenode_resource_preset='m2.large',
            computenode_disk_type='network-ssd',
            computenode_disk_size=200,
            computenode_count=2,
            computenode_max_hosts_count=5,  # The number of data-handling subclusters will automatically scale up if the load is high.
            services=['YARN', 'SPARK'],     # A lightweight cluster is being created.
            datanode_count=0,               # Without data storage subclusters.
            properties={                    # With a reference to a remote Apache Hive™ Metastore cluster.
                'spark:spark.hive.metastore.uris': f'thrift://{YC_DP_METASTORE_URI}:9083',
            },
        )
    
        # Step 2: Running a PySpark job
        poke_spark_processing = DataprocCreatePysparkJobOperator(
            task_id='dp-cluster-pyspark-task',
            main_python_file_uri=f's3a://{YC_BUCKET}/scripts/create-table.py',
        )
    
        # Step 3: Deleting the Yandex Data Processing cluster
        delete_spark_cluster = DataprocDeleteClusterOperator(
            task_id='dp-cluster-delete-task',
            trigger_rule=TriggerRule.ALL_DONE,
        )
    
        # Building a DAG based on the above steps
        create_spark_cluster >> poke_spark_processing >> delete_spark_cluster
    

    Where:

    • YC_DP_AZ: Availability zone for the Yandex Data Processing cluster.
    • YC_DP_SSH_PUBLIC_KEY: Public part of the SSH key for the Yandex Data Processing cluster.
    • YC_DP_SUBNET_ID: Subnet ID.
    • YC_DP_SA_ID: my-editor service account ID.
    • YC_DP_METASTORE_URI: IP address of the Apache Hive™ Metastore cluster.
    • YC_BUCKET — <bucket_for_jobs_and_data>.
  3. Upload the DAG to the Managed Service for Apache Airflow™ cluster: in <bucket_for_jobs_and_data>, create a folder named dags and upload the Data-Processing-DAG.py file to it.

  4. Open the Apache Airflow™ web interface.

  5. Make sure the DAGs section now contains the new DAG named DATA_INGEST and tagged as data-processing-and-airflow.

    It may take a few minutes to upload a DAG file from the bucket.

  6. To run a DAG, click image in the line with its name.

Check the resultCheck the result

High security level
Simplified setup
  1. To monitor task execution results, click the DAG name. You can find the results in the Grid tab.
  2. Wait until the status of all the three tasks in the DAG changes to Success. Simultaneously, you can check that a Yandex Data Processing cluster is being created, the PySpark job is running, and the same cluster is being deleted in the management console.
  3. Make sure your <bucket_for_PySpark_job_output_data> now contains the countries folder with the part-00000-... file. The data from the created table is now stored in the Object Storage bucket and the table metadata is stored in the Apache Hive™ Metastore cluster.
  4. Make sure there are PySpark job logs in <bucket_for_collecting_Spark_logs>.
  1. To monitor task execution results, click the DAG name. You can find the results in the Grid tab.
  2. Wait until the status of all the three tasks in the DAG changes to Success. Simultaneously, you can check that an Yandex Data Processing cluster is being created, the PySpark job is running, and the same cluster is being deleted in the management console.
  3. Make sure <bucket_for_jobs_and_data> now contains the countries folder with the part-00000-... file. The data from the created table is now stored in the Object Storage bucket and the table metadata is stored in the Apache Hive™ Metastore cluster.
  4. Make sure there are PySpark job logs in <bucket_for_jobs_and_data>. These logs are written to the dataproc, user, and var folders.

Delete the resources you createdDelete the resources you created

Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them:

High security level
Simplified setup
  1. Service accounts.
  2. Object Storage buckets.
  3. Apache Hive™ Metastore cluster.
  4. Managed Service for Apache Airflow™ cluster.
  5. Route table.
  6. NAT gateway.
  7. Security groups.
  8. Cloud subnets created in data-processing-network by default.
  9. Cloud network.
  1. Service account.
  2. Object Storage bucket.
  3. Apache Hive™ Metastore cluster.
  4. Managed Service for Apache Airflow™ cluster.
  5. Route table.
  6. NAT gateway.
  7. Security group created in data-processing-network by default.
  8. Cloud subnets created in data-processing-network by default.
  9. Cloud network.

Was the article helpful?

Previous
Working with Apache Kafka® topics using Yandex Data Processing
Next
Shared use of Yandex Data Processing tables through Apache Hive™ Metastore
© 2025 Direct Cursus Technology L.L.C.