Automating operations using Yandex Managed Service for Apache Airflow™
Warning
This tutorial was tested on clusters with the Apache Airflow™ version below 3.0.
In Yandex Managed Service for Apache Airflow™, you can create a directed acyclic graph (DAG) to automate your operations in Yandex Managed Service for Apache Spark™. Below is an example of a DAG that includes a number of tasks:
- Create an Apache Spark™ cluster.
- Run a PySpark job.
- Delete the Apache Spark™ cluster.
With a DAG like this, a cluster is short-lived. In a cluster, you can engage higher capacity resources and process more data quickly.
In this DAG, an Apache Spark™ cluster is created. We use a Apache Hive™ Metastore cluster to store tabular metadata in the example below. The saved metadata can then be used by another Apache Spark™ cluster.
To use Yandex Managed Service for Apache Spark™ in Yandex Managed Service for Apache Airflow™:
If you no longer need the resources you created, delete them.
Required paid resources
The support cost includes:
- Apache Airflow™ cluster fee: computing resources of the cluster components (see Yandex Managed Service for Apache Airflow™ pricing).
- Fee for computing resources in a Apache Hive™ Metastore cluster (see Yandex MetaData Hub pricing).
- Object Storage bucket fee: Covers data storage and bucket operations (see Object Storage pricing).
- Fee for collecting and storing logs (see Cloud Logging pricing).
Set up your infrastructure
The example below illustrates two scenarios. Select the one you find most relevant:
-
High security level. This is a recommended scenario, as it respects the principle of least privilege. This scenario entails the following:
- Splitting access permissions across different service accounts. You have to create a separate service account for each cluster and assign to it only the roles required for this account's cluster to operate.
- Using multiple buckets for different tasks and storing different data in separate buckets. For example, a DAG is loaded to one bucket, while the results of running a PySpark job are written to another bucket.
- Setting up security groups. This way, you can restrict traffic and grant access only to authorized resources.
-
Simplified setup. This scenario implies a lower security level:
- Using a single service account with more privileges than required.
- Storing all data in a single bucket but in different folders.
- No security groups are set up.
Set up the infrastructure:
-
Create service accounts with the following roles:
Service account
Roles
airflow-agentfor an Apache Airflow™ cluster- managed-airflow.integrationProvider: To enable the Apache Airflow™ cluster to interact with other resources.
- managed-spark.editor: To manage an Apache Spark™ cluster from a DAG.
- iam.serviceAccounts.user: To specify the
spark-agentservice account when creating an Apache Spark™ cluster. - vpc.user: To use the Yandex Virtual Private Cloud subnet in the Apache Airflow™ cluster.
- logging.editor: To work with log groups.
- logging.reader: To read logs.
- mdb.viewer: To get operation statuses.
- managed-metastore.viewer: To view information about Apache Hive™ Metastore clusters.
metastore-agentfor an Apache Hive™ Metastore cluster- managed-metastore.integrationProvider: To enable the Apache Hive™ Metastore cluster to interact with other resources.
spark-agentfor an Apache Spark™ cluster- managed-spark.integrationProvider: To enable the Apache Spark™ cluster to interact with other resources.
-
<bucket_for_Airflow_DAG_source_code>.<bucket_for_PySpark_job_source_code>.<bucket_for_PySpark_job_output_data>.
You need multiple buckets with different access permissions.
-
Grant permissions for the buckets as follows:
<bucket_for_Airflow_DAG_source_code>:READpermission to theairflow-agentservice account.<bucket_for_PySpark_job_source_code>:READpermission to thespark-agentservice account.<bucket_for_PySpark_job_output_data>:READ and WRITEpermissions to thespark-agentandmetastore-agentservice accounts.
-
Create a cloud network named
datalake-network.This will automatically create three subnets in different availability zones.
-
For the Apache Hive™ Metastore cluster, create a security group named
metastore-sgindatalake-network. Add the following rules to it:-
For incoming client traffic:
- Port range:
30000-32767 - Protocol:
Any - Source:
CIDR - CIDR blocks:
0.0.0.0/0
- Port range:
-
For incoming load balancer traffic:
- Port range:
10256 - Protocol:
Any - Source:
Load balancer health checks
- Port range:
-
-
For the Apache Airflow™ cluster, create a security group named
airflow-sgindatalake-network. Add the following rule to it:-
For outgoing HTTPS traffic:
- Port range:
443 - Protocol:
TCP - Destination:
CIDR - CIDR blocks:
0.0.0.0/0
- Port range:
-
-
For the Apache Spark™ cluster, create a security group named
spark-sgindatalake-network. Add the following rule to it:-
For outgoing traffic, to allow Apache Spark™ cluster connections to Apache Hive™ Metastore:
- Port range:
9083 - Protocol:
Any - Destination:
CIDR - CIDR blocks:
0.0.0.0/0
- Port range:
-
-
Create a Apache Hive™ Metastore cluster with the following parameters:
- Service account:
metastore-agent - Network:
datalake-network - Subnet:
datalake-network-ru-central1-a - Security group:
metastore-sg
Note
Wait for the operation to complete.
- Service account:
-
Create a Managed Service for Apache Airflow™ cluster with the following parameters:
- Service account:
airflow-agent - Availability zone:
ru-central1-a - Network:
datalake-network - Subnet:
datalake-network-ru-central1-a - Security group:
airflow-sg - Bucket name:
<bucket_for_Airflow_DAG_source_code>
- Service account:
Set up the infrastructure:
-
Create a service account named
integration-agentwith the following roles:- managed-airflow.integrationProvider: To enable the Apache Airflow™ cluster to interact with other resources.
- managed-spark.editor: To manage an Apache Spark™ cluster from a DAG.
- iam.serviceAccounts.user: To specify the
spark-agentservice account when creating an Apache Spark™ cluster. - vpc.user: To use the Yandex Virtual Private Cloud subnet in the Apache Airflow™ cluster.
- logging.editor: To work with log groups.
- logging.reader: To read logs.
- mdb.viewer: To get operation statuses.
- managed-metastore.viewer: To view information about Apache Hive™ Metastore clusters.
- managed-metastore.integrationProvider: To enable the Apache Hive™ Metastore cluster to interact with other resources.
- managed-spark.integrationProvider: To enable the Apache Spark™ cluster to interact with other resources.
-
Create a bucket named
<bucket_for_jobs_and_data>and grantREAD and WRITEpermissions to theintegration-agentservice account. -
Create a cloud network named
datalake-network.This will automatically create three subnets in different availability zones and a security group.
-
Create a Apache Hive™ Metastore cluster with the following parameters:
- Service account:
integration-agent - Network:
datalake-network - Subnet:
datalake-network-ru-central1-a - Security group: Default group in
datalake-network
Note
Wait for the operation to complete.
- Service account:
-
Create a Managed Service for Apache Airflow™ cluster with the following parameters:
- Service account:
integration-agent - Availability zone:
ru-central1-a - Network:
datalake-network - Subnet:
datalake-network-ru-central1-a - Security group: Default group in
datalake-network - Bucket name:
<bucket_for_jobs_and_data>
- Service account:
Prepare a PySpark job
For a PySpark job, we will use a Python script that creates a table and is stored in the Object Storage bucket. Prepare a script file:
-
Create a local file named
job_with_table.pyand paste the following script to it:import random from pyspark.sql import SparkSession def prepare_table(spark, database, table): create_database_sql = "create database if not exists {database}" create_table_sql = """ create table if not exists {database}.{table} ( id int, value double ) using iceberg """ truncate_table_sql = "truncate table {database}.{table}" spark.sql(create_database_sql.format(database=database)) spark.sql(create_table_sql.format(database=database, table=table)) spark.sql(truncate_table_sql.format(database=database, table=table)) def write_data(spark, database, table): data = [(i, random.random()) for i in range(100_000)] # Creating a dataframe df = spark.createDataFrame(data, schema=['id', 'value']) table_full_name = "{database}.{table}".format(database=database, table=table) df.writeTo(table_full_name).append() def main(): # Creating a Spark session spark = ( SparkSession .builder .appName('job_with_table') .enableHiveSupport() .getOrCreate() ) database, table = 'database_1', 'table_1' prepare_table(spark, database, table) write_data(spark, database, table) if __name__ == '__main__': main() -
In
<bucket_for_PySpark_job_source_code>, create a folder namedscriptsand upload thejob_with_table.pyfile to it.
-
Create a local file named
job_with_table.pyand paste the following script to it:import random from pyspark.sql import SparkSession def prepare_table(spark, database, table): create_database_sql = "create database if not exists {database}" create_table_sql = """ create table if not exists {database}.{table} ( id int, value double ) using iceberg """ truncate_table_sql = "truncate table {database}.{table}" spark.sql(create_database_sql.format(database=database)) spark.sql(create_table_sql.format(database=database, table=table)) spark.sql(truncate_table_sql.format(database=database, table=table)) def write_data(spark, database, table): data = [(i, random.random()) for i in range(100_000)] # Creating a dataframe df = spark.createDataFrame(data, schema=['id', 'value']) table_full_name = "{database}.{table}".format(database=database, table=table) df.writeTo(table_full_name).append() def main(): # Creating a Spark session spark = ( SparkSession .builder .appName('job_with_table') .enableHiveSupport() .getOrCreate() ) database, table = 'database_1', 'table_1' prepare_table(spark, database, table) write_data(spark, database, table) if __name__ == '__main__': main() -
In
<bucket_for_jobs_and_data>, create a folder namedscriptsand upload thejob_with_table.pyfile to it.
Prepare and run a DAG file
A DAG will have multiple vertices that form a sequence of actions:
- Yandex Managed Service for Apache Airflow™ creates a temporary Apache Spark™ cluster with settings specified in the DAG. This cluster automatically connects to the previously created Apache Hive™ Metastore cluster.
- When the Apache Spark™ cluster is ready, a PySpark job is run.
- Once the job is complete, the temporary Apache Spark™ cluster is deleted.
To prepare a DAG:
-
Create a local file named
dag.py, paste the following script to it and substitute the variables with your infrastructure data:dag.py
import logging import pendulum from airflow.models.dag import DAG from airflow.decorators import task from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook from yandexcloud.operations import OperationError YANDEX_CONN_ID = '<connection_ID>' # Your infrastructure data FOLDER_ID = '<folder_ID>' SERVICE_ACCOUNT_ID = '<spark-agent_service_account_ID>' SUBNET_IDS = [<subnet_ID>] SECURITY_GROUP_IDS = [<security_group_ID>] METASTORE_CLUSTER_ID = '<Apache_Hive™_Metastore_cluster_ID>' JOB_NAME = 'job_with_table' JOB_SCRIPT = 's3a://<bucket_for_PySpark_job_source_code>/scripts/job_with_table.py' JOB_ARGS = [] JOB_PROPERTIES = { 'spark.executor.instances': '1', 'spark.sql.warehouse.dir': 's3a://<bucket_for_PySpark_job_output_data>/warehouse', } @task # Step 1: Creating an Apache Spark™ cluster def create_cluster(yc_hook, cluster_spec): spark_client = yc_hook.sdk.wrappers.Spark() try: spark_client.create_cluster(cluster_spec) except OperationError as job_error: cluster_id = job_error.operation_result.meta.cluster_id if cluster_id: spark_client.delete_cluster(cluster_id=cluster_id) raise return spark_client.cluster_id @task # Step 2: Running a PySpark job def run_spark_job(yc_hook, cluster_id, job_spec): spark_client = yc_hook.sdk.wrappers.Spark() try: job_operation = spark_client.create_pyspark_job(cluster_id=cluster_id, spec=job_spec) job_id = job_operation.response.id job_info = job_operation.response except OperationError as job_error: job_id = job_error.operation_result.meta.job_id job_info, _ = spark_client.get_job(cluster_id=cluster_id, job_id=job_id) raise finally: job_log = spark_client.get_job_log(cluster_id=cluster_id, job_id=job_id) for line in job_log: logging.info(line) logging.info("Job info: %s", job_info) @task(trigger_rule="all_done") # Step 3: Deleting the Apache Spark™ cluster def delete_cluster(yc_hook, cluster_id): if cluster_id: spark_client = yc_hook.sdk.wrappers.Spark() spark_client.delete_cluster(cluster_id=cluster_id) # DAG settings with DAG( dag_id="example_spark", start_date=pendulum.datetime(2025, 1, 1), schedule=None, ): yc_hook = YandexCloudBaseHook(yandex_conn_id=YANDEX_CONN_ID) cluster_spec = yc_hook.sdk.wrappers.SparkClusterParameters( folder_id=FOLDER_ID, service_account_id=SERVICE_ACCOUNT_ID, subnet_ids=SUBNET_IDS, security_group_ids=SECURITY_GROUP_IDS, driver_pool_resource_preset="c2-m8", driver_pool_size=1, executor_pool_resource_preset="c4-m16", executor_pool_min_size=1, executor_pool_max_size=2, metastore_cluster_id=METASTORE_CLUSTER_ID, ) cluster_id = create_cluster(yc_hook, cluster_spec) job_spec = yc_hook.sdk.wrappers.PysparkJobParameters( name=JOB_NAME, main_python_file_uri=JOB_SCRIPT, args=JOB_ARGS, properties=JOB_PROPERTIES, ) task_job = run_spark_job(yc_hook, cluster_id, job_spec) task_delete = delete_cluster(yc_hook, cluster_id) task_job >> task_deleteWhere:
-
YANDEX_CONN_ID: Connection ID. -
FOLDER_ID: ID of the folder you will create the Apache Spark™ cluster in. -
SERVICE_ACCOUNT_ID: ID of the service account you will use to create the Apache Spark™ cluster. -
SUBNET_IDS: Subnet ID.Note
Apache Spark™ and Apache Hive™ Metastore must have the same subnet.
-
SECURITY_GROUP_IDS: ID of the security group for the Apache Spark™ cluster. -
METASTORE_CLUSTER_ID: Apache Hive™ Metastore cluster ID. -
JOB_NAME: PySpark job name. -
JOB_SCRIPT: Path to the PySpark job file. -
JOB_ARGS: PySpark job arguments. -
JOB_PROPERTIES: PySpark job properties.
-
-
Upload the DAG to the Apache Airflow™ cluster: in
<bucket_for_Airflow_DAG_source_code>, create a folder nameddagsand upload thedag.pyfile to it. -
Open the Apache Airflow™ web interface.
-
Make sure the new
example_sparkDAG has appeared in the DAGs section.It may take a few minutes to load a DAG file from the bucket.
-
To run the DAG, click
in the line with its name.
-
Create a local file named
dag.py, paste the following script to it and substitute the variables with your infrastructure data:import logging import pendulum from airflow.models.dag import DAG from airflow.decorators import task from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook from yandexcloud.operations import OperationError YANDEX_CONN_ID = '<connection_ID>' # Your infrastructure data FOLDER_ID = '<folder_ID>' SERVICE_ACCOUNT_ID = '<integration-agent_service_account_ID>' SUBNET_IDS = [<subnet_ID>] SECURITY_GROUP_IDS = [<security_group_ID>] METASTORE_CLUSTER_ID = '<Apache_Hive™_Metastore_cluster_ID>' JOB_NAME = 'job_with_table' JOB_SCRIPT = 's3a://<bucket_for_jobs_and_data>/scripts/job_with_table.py' JOB_ARGS = [] JOB_PROPERTIES = { 'spark.executor.instances': '1', 'spark.sql.warehouse.dir': 's3a://<bucket_for_jobs_and_data>/warehouse', } @task # Step 1: Creating an Apache Spark™ cluster def create_cluster(yc_hook, cluster_spec): spark_client = yc_hook.sdk.wrappers.Spark() try: spark_client.create_cluster(cluster_spec) except OperationError as job_error: cluster_id = job_error.operation_result.meta.cluster_id if cluster_id: spark_client.delete_cluster(cluster_id=cluster_id) raise return spark_client.cluster_id @task # Step 2: Running a PySpark job def run_spark_job(yc_hook, cluster_id, job_spec): spark_client = yc_hook.sdk.wrappers.Spark() try: job_operation = spark_client.create_pyspark_job(cluster_id=cluster_id, spec=job_spec) job_id = job_operation.response.id job_info = job_operation.response except OperationError as job_error: job_id = job_error.operation_result.meta.job_id job_info, _ = spark_client.get_job(cluster_id=cluster_id, job_id=job_id) raise finally: job_log = spark_client.get_job_log(cluster_id=cluster_id, job_id=job_id) for line in job_log: logging.info(line) logging.info("Job info: %s", job_info) @task(trigger_rule="all_done") # Step 3: Deleting the Apache Spark™ cluster def delete_cluster(yc_hook, cluster_id): if cluster_id: spark_client = yc_hook.sdk.wrappers.Spark() spark_client.delete_cluster(cluster_id=cluster_id) # DAG settings with DAG( dag_id="example_spark", start_date=pendulum.datetime(2025, 1, 1), schedule=None, ): yc_hook = YandexCloudBaseHook(yandex_conn_id=YANDEX_CONN_ID) cluster_spec = yc_hook.sdk.wrappers.SparkClusterParameters( folder_id=FOLDER_ID, service_account_id=SERVICE_ACCOUNT_ID, subnet_ids=SUBNET_IDS, security_group_ids=SECURITY_GROUP_IDS, driver_pool_resource_preset="c2-m8", driver_pool_size=1, executor_pool_resource_preset="c4-m16", executor_pool_min_size=1, executor_pool_max_size=2, metastore_cluster_id=METASTORE_CLUSTER_ID, ) cluster_id = create_cluster(yc_hook, cluster_spec) job_spec = yc_hook.sdk.wrappers.PysparkJobParameters( name=JOB_NAME, main_python_file_uri=JOB_SCRIPT, args=JOB_ARGS, properties=JOB_PROPERTIES, ) task_job = run_spark_job(yc_hook, cluster_id, job_spec) task_delete = delete_cluster(yc_hook, cluster_id) task_job >> task_deleteWhere:
-
YANDEX_CONN_ID: Connection ID. -
FOLDER_ID: ID of the folder you will create the Apache Spark™ cluster in. -
SERVICE_ACCOUNT_ID: ID of the service account you will use to create the Apache Spark™ cluster. -
SUBNET_IDS: Subnet ID.Note
Apache Spark™ and Apache Hive™ Metastore must have the same subnet.
-
SECURITY_GROUP_IDS: ID of the security group for the Apache Spark™ cluster. -
METASTORE_CLUSTER_ID: Apache Hive™ Metastore cluster ID. -
JOB_NAME: PySpark job name. -
JOB_SCRIPT: Path to the PySpark job file. -
JOB_ARGS: PySpark job arguments. -
JOB_PROPERTIES: PySpark job properties.
-
-
Upload the DAG to the Apache Airflow™ cluster: in
<bucket_for_jobs_and_data>, create a folder nameddagsand upload thedag.pyfile to it. -
Open the Apache Airflow™ web interface.
-
Make sure the new
example_sparkDAG has appeared in the DAGs section.It may take a few minutes to load a DAG file from the bucket.
-
To run a DAG, click
in the line with its name.
Check the result
- To monitor task execution results, click the DAG name.
- Wait until the status of all the three tasks in the DAG changes to Success. Simultaneously, you can check that an Apache Spark™ cluster is being created, the PySpark job is running, and the same cluster is being deleted in the management console
. - Make sure
<bucket_for_PySpark_job_output_data>now containsdatabase_1. The data from the new DB is now stored in the Object Storage bucket, and the DB metadata is stored in the Apache Hive™ Metastore cluster.
- To monitor task execution results, click the DAG name.
- Wait until the status of all the three tasks in the DAG changes to Success. Simultaneously, you can check that an Apache Spark™ cluster is being created, the PySpark job is running, and the same cluster is being deleted in the management console
. - Make sure
<bucket_for_jobs_and_data>now containsdatabase_1. The data from the new DB is now stored in the Object Storage bucket, and the DB metadata is stored in the Apache Hive™ Metastore cluster.
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them:
- Service accounts.
- Object Storage buckets.
- Apache Hive™ Metastore cluster.
- Apache Airflow™ cluster.
- Security groups.
- Cloud subnets created in
datalake-networkby default. - Cloud network.
- Service account.
- Object Storage bucket.
- Apache Hive™ Metastore cluster.
- Apache Airflow™ cluster.
- Security group created in
datalake-networkby default. - Cloud subnets created in
datalake-networkby default. - Cloud network.