Running a PySpark job using Yandex Managed Service for Apache Airflow™
Warning
This tutorial was tested on clusters with the Apache Airflow™ version below 3.0.
You can use a Yandex Managed Service for Apache Airflow™ cluster to automate your work with Yandex Managed Service for Apache Spark™, including the following operations:
- Creating Apache Spark™ clusters.
- Waiting for clusters to start.
- Running Apache Spark™ jobs.
- Checking the results.
- Shutting down resources and deleting clusters.
Do it by creating a directed acyclic graph (DAG) for jobs. The Apache Airflow™ cluster will use this DAG to automatically perform all its Apache Spark™-related actions.
Using Apache Spark™ via Apache Airflow™ allows you to:
- Run jobs on a schedule to create reports and data snapshots, perform maintenance, metric updates, etc.
- Build pipelines comprising data analysis, experiments, model fine-tuning, and other tasks.
- Quickly process massive volumes of data on the fly without having to pay for a permanent infrastructure with high-power resources.
This tutorial describes the minimum steps for Apache Airflow™ integration with Apache Spark™, while not using any Object Storage S3 storage or Apache Hive™ Metastore cluster. In this configuration, the Apache Spark™ cluster can only work with in-memory data: create temporary dataframes, apply standard transformations and functions, cache results, and use temporary views for SQL queries.
Note
To work with permanent databases and tables and for long-term storage of results, you need an external Object Storage storage and, if necessary, an Apache Hive™ Metastore cluster for data management.
The Apache Airflow™ to Apache Spark™ integration is illustrated with a DAG that performs the following operations:
- Creating an Apache Spark™ cluster.
- Running a PySpark job.
- Deleting the Apache Spark™ cluster.
To implement the above example:
If you no longer need the resources you created, delete them.
Required paid resources
The support cost for this solution includes:
- Fee for the computing resources of Managed Service for Apache Airflow™ cluster components (see Yandex Managed Service for Apache Airflow™ pricing).
- Object Storage bucket fee for storage and data operations (see Object Storage pricing).
- Cloud Logging fee for the amount of data written and the time of its retention (see Cloud Logging pricing).
- Fee for the computing resources of Yandex Managed Service for Apache Spark™ cluster components (see Yandex Managed Service for Apache Spark™ pricing).
Set up the infrastructure
In this tutorial uses a simplified infrastructure setup:
- A single service account with advanced privileges.
- A single bucket for the whole data.
- The default security group.
This setup is good for testing but does not ensure a sufficient security level for real-world scenarios. To make the solution more secure, adhere to the principle of least privilege.
Set up the infrastructure:
-
Create a service account named
integration-agentwith the following roles:- managed-airflow.integrationProvider: To enable the Apache Airflow™ cluster to interact with other resources.
- managed-spark.editor: To manage your Apache Spark™ cluster from a DAG.
- iam.serviceAccounts.user: To select the
integration-agentservice account when creating the Apache Spark™ cluster. - vpc.user: To use the Yandex Virtual Private Cloud subnet in the Apache Airflow™ cluster.
- logging.editor: To work with log groups.
- logging.reader: To read logs.
- mdb.viewer: To get operation statuses.
-
Grant the
READ and WRITEpermissions for the bucket to theintegration-agentservice account. -
Create a cloud network named
datalake-network.This will automatically create three subnets in different availability zones and a security group.
-
Create a Managed Service for Apache Airflow™ cluster with the following parameters:
- Service account:
integration-agent. - Availability zone:
ru-central1-a. - Network:
datalake-network. - Subnet:
datalake-network-ru-central1-a. - Security group: Default group in
datalake-network. - Bucket name: The name of the previously created bucket.
- Service account:
Prepare a PySpark job
For a PySpark job, we will use a Python script that:
- Creates a dataframe with numbers from 0 to 9.
- Outputs the number of rows in the new dataframe.
- Outputs the first five rows in table format.
The script will be stored in the Object Storage bucket.
Prepare a script file:
-
Create a local file named
job_minimal.pyand paste the following script to it:from pyspark.sql import SparkSession spark = (SparkSession.builder.appName("hello_spark").getOrCreate()) df = spark.range(10).toDF("id") print("Row count:", df.count()) df.show(5, truncate=False) spark.stop() -
In the bucket, create a folder named
scriptsand upload thejob_minimal.pyfile to it.
Prepare and run a DAG file
A DAG will have multiple vertices that form a sequence of actions:
- Managed Service for Apache Airflow™ creates a temporary Apache Spark™ cluster with settings specified in the DAG.
- When the Apache Spark™ cluster is ready, a PySpark job is run.
- Once the job is complete, the temporary Apache Spark™ cluster is deleted.
To prepare a DAG:
-
Create a local file named
dag.py, paste the following script to it and substitute the variables with your infrastructure data:dag.py
import logging import pendulum from airflow.models.dag import DAG from airflow.decorators import task from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook from airflow.exceptions import AirflowSkipException from yandexcloud.operations import OperationError # Your infrastructure data FOLDER_ID = '<folder_ID>' SERVICE_ACCOUNT_ID = '<integration-agent_service_account_ID>' SUBNET_IDS = ['<subnet_ID>'] SECURITY_GROUP_IDS = ['<security_group_ID>'] JOB_NAME = 'job_minimal' JOB_SCRIPT = 's3a://<bucket_name>/scripts/job_minimal.py' JOB_ARGS = [] JOB_PROPERTIES = { 'spark.executor.instances': '1', } @task # Step 1: Creating an Apache Spark™ cluster def create_cluster(yc_hook, cluster_spec): spark_client = yc_hook.sdk.wrappers.Spark() spark_client.create_cluster(cluster_spec) return spark_client.cluster_id @task # Step 2: Running a PySpark job def run_spark_job(yc_hook, cluster_id, job_spec): spark_client = yc_hook.sdk.wrappers.Spark() try: job_operation = spark_client.create_pyspark_job(cluster_id=cluster_id, spec=job_spec) job_id = job_operation.response.id job_info = job_operation.response except OperationError as job_error: job_id = job_error.operation_result.meta.job_id job_info, _ = spark_client.get_job(cluster_id=cluster_id, job_id=job_id) raise finally: job_log = spark_client.get_job_log(cluster_id=cluster_id, job_id=job_id) for line in job_log: logging.info(line) logging.info("Job info: %s", job_info) @task(trigger_rule="all_done") # Step 3: Deleting the Apache Spark™ cluster def delete_cluster(yc_hook, cluster_id): if cluster_id: spark_client = yc_hook.sdk.wrappers.Spark() spark_client.delete_cluster(cluster_id=cluster_id) else: raise AirflowSkipException("cluster_id is empty; nothing to delete") # DAG settings with DAG( dag_id="example_spark", start_date=pendulum.datetime(2025, 1, 1), schedule=None, ): yc_hook = YandexCloudBaseHook() cluster_spec = yc_hook.sdk.wrappers.SparkClusterParameters( folder_id=FOLDER_ID, service_account_id=SERVICE_ACCOUNT_ID, subnet_ids=SUBNET_IDS, security_group_ids=SECURITY_GROUP_IDS, driver_pool_resource_preset="c2-m8", driver_pool_size=1, executor_pool_resource_preset="c4-m16", executor_pool_min_size=1, executor_pool_max_size=2, ) cluster_id = create_cluster(yc_hook, cluster_spec) job_spec = yc_hook.sdk.wrappers.PysparkJobParameters( name=JOB_NAME, main_python_file_uri=JOB_SCRIPT, args=JOB_ARGS, properties=JOB_PROPERTIES, ) task_job = run_spark_job(yc_hook, cluster_id, job_spec) task_delete = delete_cluster(yc_hook, cluster_id) task_job >> task_deleteWhere:
FOLDER_ID: ID of the folder you will create the Apache Spark™ cluster in.SERVICE_ACCOUNT_ID: ID of the service account you will use to create the Apache Spark™ cluster.SUBNET_IDS:datalake-network-ru-central1-asubnet ID.SECURITY_GROUP_IDS: ID of the security group for the Apache Spark™ cluster.JOB_NAME: PySpark job name.JOB_SCRIPT: Path to the PySpark job file in the bucket.JOB_ARGS: PySpark job arguments.JOB_PROPERTIES: PySpark job properties.
-
Upload the DAG to the Apache Airflow™ cluster by creating a folder named
dagsin the bucket and uploading thedag.pyfile to it. -
Open the Apache Airflow™ web interface.
-
Make sure the new
example_sparkDAG has appeared in the DAGs section.It may take a few minutes to load a DAG file from the bucket.
-
Run the DAG by clicking
in the line with its name.
Check the result
-
In the Apache Airflow™ web interface, click the
example_sparkDAG name and monitor the progress of the jobs. -
Wait until all three jobs in the DAG get Success status.
-
Navigate to the
Graphtab. -
In the graph that opens, click the job named
run_spark_joband go to theLogstab. -
Make sure the PySpark job has produced a correct output to the log. Do it by finding these log lines:
Row count: 10+---+ |id | +---+ |0 | |1 | |2 | |3 | |4 | +---+
Note
In the management console
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them:
- Service account.
- Object Storage bucket.
- Apache Airflow™ cluster.
- Security group created in
datalake-networkby default. - Cloud subnets created in
datalake-networkby default. - Cloud network named
datalake-network.