Automating operations with Yandex Data Processing using Yandex Managed Service for Apache Airflow™
Yandex Managed Service for Apache Airflow™ enables you to create Directed Acyclic Graphs (DAGs) of tasks to automate your operations with Yandex Data Processing. Below is an example of a DAG that includes multiple tasks:
- Create a Yandex Data Processing cluster.
- Create and run a PySpark job.
- Delete a Yandex Data Processing cluster.
With a DAG like this, a cluster's lifetime is short. Since the cost of Yandex Data Processing resources depends on their usage time, you can use resources with higher capacity in the cluster and quickly handle a larger amount of data at no additional cost.
In this DAG, a Yandex Data Processing cluster is created without using Hive. In the example below, a Hive Metastore cluster is used for storing table metadata. The saved metadata can then be used by another Yandex Data Processing cluster.
To automate operations with Yandex Data Processing using Managed Service for Apache Airflow™:
- Prepare your infrastructure.
- Prepare a PySpark job.
- Create Apache Airflow™ connections.
- Prepare and run a DAG file.
- Check the result.
If you no longer need the resources you created, delete them.
Prepare the infrastructure
-
Create a service account named
airflow-sa
with the following roles:storage.editor
dataproc.agent
dataproc.editor
-
Create the following keys for the service account:
- Static access key. Save its ID and the secret key.
- Authorized key. Save the key's public part and download a file with the public and the private parts.
-
airflow-bucket
for Managed Service for Apache Airflow™pyspark-bucket
for a PySpark joboutput-bucket
for output datalog-bucket
to store collected logs
You need multiple buckets with different access permissions.
-
Grant the
airflow-sa
service account the following bucket permissions:READ
forairflow-bucket
READ
forpyspark-bucket
READ and WRITE
foroutput-bucket
READ and WRITE
forlog-bucket
-
Create a cloud network named
dataproc-network
.This automatically creates three subnets in different availability zones and a security group.
-
Set up a NAT gateway in the
dataproc-network-ru-central1-a
subnet. -
Add these rules to the security group in
dataproc-network
:Security group rules
What service requires the rule
Why the rule is required
Rule settings
Yandex Data Processing
For incoming service traffic
- Port range:
0-65535
- Protocol:
Any
- Source:
Security group
- Security group:
Self
Metastore
For incoming client traffic
- Port range:
30000-32767
- Protocol:
Any
- Source:
CIDR
- CIDR blocks:
0.0.0.0/0
Metastore
For incoming load balancer traffic
- Port range:
10256
- Protocol:
Any
- Source:
Load balancer health checks
Yandex Data Processing
For outgoing service traffic
- Port range:
0-65535
- Protocol:
Any
- Source:
Security group
- Security group:
Self
Yandex Data Processing
For outgoing HTTPS traffic
- Port range:
443
- Protocol:
TCP
- Destination type:
CIDR
- CIDR blocks:
0.0.0.0/0
Yandex Data Processing
For outgoing traffic to allow Yandex Data Processing cluster connections to Metastore
- Port range:
9083
- Protocol:
Any
- Source:
CIDR
- CIDR blocks:
0.0.0.0/0
- Port range:
-
Create a Metastore cluster with the following parameters:
- Network:
dataproc-network
- Subnet:
dataproc-network-ru-central1-a
- Security group: Default group in
dataproc-network
- Key ID and secret key: Belong to the static access key
- Network:
-
Create a Managed Service for Apache Airflow™ cluster with the following parameters:
- Availability zone:
ru-central1-a
- Network:
dataproc-network
- Subnet:
dataproc-network-ru-central1-a
- Security group: Default group in
dataproc-network
- Bucket name:
airflow-bucket
- Availability zone:
Prepare a PySpark job
For a PySpark job, we will use a Python script that creates a table and is stored in the Object Storage bucket. Prepare a script file:
-
Create a local file named
create-table.py
and copy the following script to it:create-table.py
from pyspark.sql.types import * from pyspark.sql import SparkSession # Creating a Spark session spark = SparkSession.builder \ .appName("create-table") \ .enableHiveSupport() \ .getOrCreate() # Creating a data schema schema = StructType([StructField('Name', StringType(), True), StructField('Capital', StringType(), True), StructField('Area', IntegerType(), True), StructField('Population', IntegerType(), True)]) # Creating a dataframe df = spark.createDataFrame([('Australia', 'Canberra', 7686850, 19731984), ('Austria', 'Vienna', 83855, 7700000)], schema) # Writing the dataframe to a bucket as a _countries_ table df.write.mode("overwrite").option("path","s3a://output-bucket/countries").saveAsTable("countries")
-
Create a
scripts
folder inpyspark-bucket
and upload thecreate-table.py
file to it.
Create Apache Airflow™ connections
For DAG tasks, we will use two connections: one for Object Storage and one for the service account. They will be specified in the DAG file. To create connections:
-
Go to Admin → Connections.
-
Click .
-
Create a connection for Object Storage. Specify its parameters:
-
Connection Id:
yc-s3
-
Connection Type:
Amazon Elastic MapReduce
-
Run Job Flow Configuration: JSON file in the following format:
{ "aws_access_key_id": "<static_key_ID>", "aws_secret_access_key": "<secret_key>", "host": "https://storage.yandexcloud.net/" }
In the file, specify the data of the static access key that you created earlier.
-
-
Click Save.
-
Click once again to create a connection for the service account.
-
Specify the connection parameters:
- Connection Id:
yc-airflow-sa
. - Connection Type:
Yandex Cloud
. - Service account auth JSON: Contents of the JSON file with the authorized key.
- Public SSH key: Public part of the previously created authorized key. Write the key as a single string without any line break characters.
Leave the other fields empty.
- Connection Id:
-
Click Save.
Prepare and run a DAG file
A DAG will have multiple vertices that form a sequence of consecutive actions:
- Managed Service for Apache Airflow™ creates a temporary lightweight Yandex Data Processing cluster whose settings are set in the DAG. This cluster automatically connects to the previously created Metastore cluster.
- When the Yandex Data Processing cluster is ready, a PySpark job is run.
- Once the job is executed, the temporary Yandex Data Processing cluster is deleted.
To prepare a DAG:
-
Create an SSH key. Save the public part of the key: you will need it to create a Yandex Data Processing cluster.
-
Create a local file named
Data-Proc-DAG.py
, copy the following script to it and insert your infrastructure data in variables:Data-Proc-DAG.py
import uuid import datetime from airflow import DAG, settings from airflow.models import Connection, Variable from airflow.utils.trigger_rule import TriggerRule from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator from airflow.providers.yandex.operators.yandexcloud_dataproc import ( DataprocCreateClusterOperator, DataprocCreatePysparkJobOperator, DataprocDeleteClusterOperator, ) # Your infrastructure data YC_DP_FOLDER_ID = '<folder_ID>' YC_DP_SSH_PUBLIC_KEY = '<public_part_of_SSH_key>' YC_DP_SUBNET_ID = '<subnet_ID>' YC_DP_GROUP_ID = '<security_group_ID>' YC_DP_SA_ID = '<service_account_ID>' YC_DP_METASTORE_URI = '<IP_address>' YC_DP_AZ = 'ru-central1-a' YC_SOURCE_BUCKET = 'pyspark-bucket' YC_DP_LOGS_BUCKET = 'log-bucket' # Creating a connection for Object Storage session = settings.Session() ycS3_connection = Connection( conn_id='yc-s3' ) if not session.query(Connection).filter(Connection.conn_id == ycS3_connection.conn_id).first(): session.add(ycS3_connection) session.commit() # Creating a connection for the service account ycSA_connection = Connection( conn_id='yc-airflow-sa' ) if not session.query(Connection).filter(Connection.conn_id == ycSA_connection.conn_id).first(): session.add(ycSA_connection) session.commit() # DAG settings with DAG( 'DATA_INGEST', schedule_interval='@hourly', tags=['data-proc-and-airflow'], start_date=datetime.datetime.now(), max_active_runs=1, catchup=False ) as ingest_dag: # Step 1: Creating a Yandex Data Processing cluster create_spark_cluster = DataprocCreateClusterOperator( task_id='dp-cluster-create-task', folder_id=YC_DP_FOLDER_ID, cluster_name=f'tmp-dp-{uuid.uuid4()}', cluster_description='A temporary cluster for running a PySpark job orchestrated by Managed Service for Apache Airflow™', ssh_public_keys=YC_DP_SSH_PUBLIC_KEY, subnet_id=YC_DP_SUBNET_ID, s3_bucket=YC_DP_LOGS_BUCKET, service_account_id=YC_DP_SA_ID, zone=YC_DP_AZ, cluster_image_version='2.1.7', enable_ui_proxy=False, masternode_resource_preset='s2.small', masternode_disk_type='network-ssd', masternode_disk_size=200, computenode_resource_preset='m2.large', computenode_disk_type='network-ssd', computenode_disk_size=200, computenode_count=2, computenode_max_hosts_count=5, # The number of data handling subclusters will be automatically scaled up if the load is high. services=['YARN', 'SPARK'], # A lightweight cluster is being created. datanode_count=0, # Without data storage subclusters. properties={ # With a reference to a remote Metastore cluster. 'spark:spark.hive.metastore.uris': f'thrift://{YC_DP_METASTORE_URI}:9083', }, security_group_ids=[YC_DP_GROUP_ID], connection_id=ycSA_connection.conn_id, dag=ingest_dag ) # Step 2: Running a PySpark job poke_spark_processing = DataprocCreatePysparkJobOperator( task_id='dp-cluster-pyspark-task', main_python_file_uri=f's3a://{YC_SOURCE_BUCKET}/scripts/create-table.py', connection_id=ycSA_connection.conn_id, dag=ingest_dag ) # Step 3: Deleting a Yandex Data Processing cluster delete_spark_cluster = DataprocDeleteClusterOperator( task_id='dp-cluster-delete-task', trigger_rule=TriggerRule.ALL_DONE, dag=ingest_dag ) # Building a DAG based on the above steps create_spark_cluster >> poke_spark_processing >> delete_spark_cluster
Where:
YC_DP_FOLDER_ID
: ID of the cloud folder.YC_DP_SSH_PUBLIC_KEY
: Public part of the SSH key for the Yandex Data Processing cluster.YC_DP_SUBNET_ID
: Subnet ID.YC_DP_GROUP_ID
: Security group ID.YC_DP_SA_ID
: Service account ID.YC_DP_METASTORE_URI
: IP address of the Metastore cluster.YC_DP_AZ
: Availability zone for the Yandex Data Processing cluster, e.g.,ru-central1-a
.YC_SOURCE_BUCKET
: Bucket with the Python script for the PySpark job, e.g.,pyspark-bucket
.YC_DP_LOGS_BUCKET
: Bucket for storing logs, e.g.,pyspark-bucket
.
-
Upload the DAG to the Managed Service for Apache Airflow™ cluster: in your
airflow-bucket
, create a folder nameddags
and upload theData-Proc-DAG.py
file to it. -
Make sure the DAGs section now contains a new DAG named
DATA_INGEST
and tagged asdata-proc-and-airflow
.It may take a few minutes to upload a DAG file from the bucket.
-
To run a DAG, click in the line with its name.
Check the result
- To monitor task execution results, click the DAG name. The results are displayed in the Grid tab.
- Wait until the status of all the three tasks in the DAG changes to Success. Simultaneously, you can check that a Yandex Data Processing cluster is being created, the PySpark job is running, and the same cluster is being deleted in the management console
. - Make sure your
output-bucket
now contains a folder namedcountries
with thepart-00000-...
file. The data from the created table is now stored in the Object Storage bucket and the table metadata is stored in the Metastore cluster. - Make sure you have the PySpark job logs in your
log-bucket
.
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them:
- Service account
- Object Storage buckets
- Metastore cluster
- Managed Service for Apache Airflow™ cluster
- Route table
- NAT gateway
- Security group
- Cloud subnets created in
dataproc-network
by default - Cloud network