Automating operations using Yandex Managed Service for Apache Airflow™
In Yandex Managed Service for Apache Airflow™, you can create a directed acyclic graph (DAG) to automate your operations in Yandex Data Processing. Below is an example of a DAG that includes a number of tasks:
- Create a Yandex Data Processing cluster.
- Create and run a PySpark job.
- Delete the Yandex Data Processing cluster.
With a DAG like this, a cluster is short-lived. Since the cost of Yandex Data Processing resources depends on their usage time, you can use resources with higher capacity in the cluster and quickly process a larger amount of data at no additional cost.
In this DAG, a Yandex Data Processing cluster is created without using Hive. In the example below, a Apache Hive™ Metastore cluster is used for storing table metadata. The saved metadata can then be used by another Yandex Data Processing cluster.
To automate operations with Yandex Data Processing using Managed Service for Apache Airflow™:
If you no longer need the resources you created, delete them.
Required paid resources
The support cost for this solution includes:
- Fee for a Managed Service for Apache Airflow™ cluster: Computing resources of cluster components (see Apache Airflow™ pricing).
- Fee for the Apache Hive™ Metastore cluster computing resources (see Yandex MetaData Hub pricing).
- Fee for a NAT gateway (see Virtual Private Cloud pricing).
- Object Storage bucket fee: Data storage and operations (see Object Storage pricing).
- Fee for a Yandex Data Processing cluster: Using VM computing resources, Compute Cloud network disks, and Cloud Logging for log management (see Yandex Data Processing pricing).
Set up the infrastructure
The example below illustrates two scenarios. Select the one you find most relevant:
-
High security level. This is a recommended scenario, as it respects the principle of least privilege. This scenario entails the following:
- Splitting access permissions across different service accounts. You have to create a separate service account for each cluster and assign to it only the roles required for this account's cluster to operate.
- Using multiple buckets for different tasks and storing different data in separate buckets. For example, the results of running a PySpark job get written to one bucket, and logs, to another.
- Setting up security groups. This way, you can restrict traffic and grant access only to authorized resources.
-
Simplified setup. This scenario implies a lower security level:
- Using a single service account with more privileges than required.
- Storing all data in a single bucket but in different folders.
- No security groups are set up.
Set up the infrastructure:
-
Create service accounts with the following roles:
Service account
Roles
airflow-agent
for an Apache Airflow™ cluster- dataproc.editor to manage a Yandex Data Processing cluster from a DAG.
- vpc.user to use the Yandex Virtual Private Cloud subnet in the Apache Airflow™ cluster.
- managed-airflow.integrationProvider to enable the Apache Airflow™ cluster to interact with other resources.
- iam.serviceAccounts.user to specify the
data-processing-agent
service account when creating a Yandex Data Processing cluster.
metastore-agent
for a Apache Hive™ Metastore cluster- managed-metastore.integrationProvider to enable the Apache Hive™ Metastore cluster to interact with other resources.
data-processing-agent
for a Yandex Data Processing cluster- dataproc.agent: To enable the service account to get info on cluster host states, jobs, and log groups.
- dataproc.provisioner: To enable the service account to work with an autoscaling instance group. This will enable subcluster autoscaling.
-
<bucket_for_Managed_Airflow>
<bucket_for_PySpark_job_source_code>
<bucket_for_PySpark_job_output_data>
<bucket_for_collecting_Spark_logs>
You need multiple buckets with different access permissions.
-
Grant permissions for the buckets as follows:
<bucket_for_Managed_Airflow>
:READ
permission to theairflow-agent
service account.<bucket_for_PySpark_job_source_code>
:READ
permission to thedata-processing-agent
service account.<bucket_for_PySpark_job_output_data>
:READ and WRITE
permissions to thedata-processing-agent
andmetastore-agent
service accounts.<bucket_for_collecting_Spark_logs>
:READ and WRITE
permissions to thedata-processing-agent
service account.
-
Create a cloud network named
data-processing-network
.This will automatically create three subnets in different availability zones.
-
Set up a NAT gateway for the
data-processing-network-ru-central1-a
subnet. -
For the Apache Hive™ Metastore cluster, create a security group named
metastore-sg
indata-processing-network
. Add the following rules to the group:-
For incoming client traffic:
- Port range:
30000-32767
- Protocol:
Any
- Source:
CIDR
- CIDR blocks:
0.0.0.0/0
- Port range:
-
For incoming load balancer traffic:
- Port range:
10256
- Protocol:
Any
- Source:
Load balancer health checks
- Port range:
-
-
For the Managed Service for Apache Airflow™ and Yandex Data Processing clusters, create a security group named
airflow-sg
indata-processing-network
. Add the following rules to it:-
For incoming service traffic:
- Port range:
0-65535
- Protocol:
Any
- Source:
Security group
- Security group:
Self
- Port range:
-
For outgoing service traffic:
- Port range:
0-65535
- Protocol:
Any
- Destination:
Security group
- Security group:
Self
- Port range:
-
For outgoing HTTPS traffic:
- Port range:
443
- Protocol:
TCP
- Destination:
CIDR
- CIDR blocks:
0.0.0.0/0
- Port range:
-
For outgoing traffic, to allow Yandex Data Processing cluster connections to Apache Hive™ Metastore:
- Port range:
9083
- Protocol:
Any
- Destination:
Security group
- Security group:
metastore-sg
(From list
)
- Port range:
-
-
Create a Apache Hive™ Metastore cluster with the following parameters:
- Service account:
metastore-agent
- Network:
data-processing-network
- Subnet:
data-processing-network-ru-central1-a
- Security group:
metastore-sg
- Service account:
-
Create a Managed Service for Apache Airflow™ cluster with the following parameters:
- Service account:
airflow-agent
- Availability zone:
ru-central1-a
- Network:
data-processing-network
- Subnet:
data-processing-network-ru-central1-a
- Security group:
airflow-sg
- Bucket name:
<bucket_for_Managed_Airflow>
- Service account:
Set up the infrastructure:
-
Create a service account named
my-editor
with the following roles:- dataproc.editor to manage a Yandex Data Processing cluster from a DAG.
- editor to perform other operations.
-
Create a bucket named
<bucket_for_jobs_and_data>
.You do not need to grant a permission for this bucket to the service account as the
editor
role is enough. -
Create a cloud network named
data-processing-network
.This will automatically create three subnets in different availability zones and a security group.
-
Set up a NAT gateway for the
data-processing-network-ru-central1-a
subnet. -
Create a Apache Hive™ Metastore cluster with the following parameters:
- Service account:
my-editor
- Network:
data-processing-network
- Subnet:
data-processing-network-ru-central1-a
- Security group: Default group in
data-processing-network
- Service account:
-
Create a Managed Service for Apache Airflow™ cluster with the following parameters:
- Service account:
my-editor
- Availability zone:
ru-central1-a
- Network:
data-processing-network
- Subnet:
data-processing-network-ru-central1-a
- Security group: Default group in
data-processing-network
- Bucket name:
<bucket_for_jobs_and_data>
- Service account:
Prepare a PySpark job
For a PySpark job, we will use a Python script that creates a table and is stored in the Object Storage bucket. Prepare a script file:
-
Create a local file named
create-table.py
and paste the following script to it:create-table.py
from pyspark.sql.types import * from pyspark.sql import SparkSession # Creating a Spark session spark = SparkSession.builder \ .appName("create-table") \ .enableHiveSupport() \ .getOrCreate() # Creating a data schema schema = StructType([StructField('Name', StringType(), True), StructField('Capital', StringType(), True), StructField('Area', IntegerType(), True), StructField('Population', IntegerType(), True)]) # Creating a dataframe df = spark.createDataFrame([('Australia', 'Canberra', 7686850, 19731984), ('Austria', 'Vienna', 83855, 7700000)], schema) # Writing the dataframe to a bucket as a countries table df.write.mode("overwrite").option("path","s3a://<bucket_for_PySpark_job_output_data>/countries").saveAsTable("countries")
-
In
<bucket_for_PySpark_job_source_code>
, create a folder namedscripts
and upload thecreate-table.py
file to it.
-
Create a local file named
create-table.py
and paste the following script to it:create-table.py
from pyspark.sql.types import * from pyspark.sql import SparkSession # Creating a Spark session spark = SparkSession.builder \ .appName("create-table") \ .enableHiveSupport() \ .getOrCreate() # Creating a data schema schema = StructType([StructField('Name', StringType(), True), StructField('Capital', StringType(), True), StructField('Area', IntegerType(), True), StructField('Population', IntegerType(), True)]) # Creating a dataframe df = spark.createDataFrame([('Australia', 'Canberra', 7686850, 19731984), ('Austria', 'Vienna', 83855, 7700000)], schema) # Writing the dataframe to a bucket as a countries table df.write.mode("overwrite").option("path","s3a://<bucket_for_jobs_and_data>/countries").saveAsTable("countries")
-
In
<bucket_for_jobs_and_data>
, create a folder namedscripts
and upload thecreate-table.py
file to it.
Prepare and run a DAG file
A DAG will have multiple vertices that form a sequence of actions:
- Managed Service for Apache Airflow™ creates a temporary lightweight Yandex Data Processing cluster whose settings are set in the DAG. This cluster automatically connects to the previously created Apache Hive™ Metastore cluster.
- When the Yandex Data Processing cluster is ready, a PySpark job is run.
- Once the job is complete, the temporary Yandex Data Processing cluster is deleted.
To prepare a DAG:
-
Create an SSH key. Save the public part of the key: you will need it to create a Yandex Data Processing cluster.
-
Create a local file named
Data-Processing-DAG.py
, paste the following script to it and substitute the variables with your infrastructure data:Data-Processing-DAG.py
import uuid import datetime from airflow import DAG from airflow.utils.trigger_rule import TriggerRule from airflow.providers.yandex.operators.yandexcloud_dataproc import ( DataprocCreateClusterOperator, DataprocCreatePysparkJobOperator, DataprocDeleteClusterOperator, ) # Your infrastructure data YC_DP_AZ = 'ru-central1-a' YC_DP_SSH_PUBLIC_KEY = '<public_part_of_SSH_key>' YC_DP_SUBNET_ID = '<subnet_ID>' YC_DP_SA_ID = '<ID_of_data_processing_agent_service_account>' YC_DP_METASTORE_URI = '<IP_address>' YC_SOURCE_BUCKET = '<bucket_for_PySpark_job_source_code>' YC_DP_LOGS_BUCKET = '<bucket_for_collecting_Spark_logs>' # DAG settings with DAG( 'DATA_INGEST', schedule_interval='@hourly', tags=['data-processing-and-airflow'], start_date=datetime.datetime.now(), max_active_runs=1, catchup=False ) as ingest_dag: # Step 1: Creating a Yandex Data Proc cluster create_spark_cluster = DataprocCreateClusterOperator( task_id='dp-cluster-create-task', cluster_name=f'tmp-dp-{uuid.uuid4()}', cluster_description='Temporary cluster for running a PySpark job orchestrated by Managed Service for Apache Airflow™', ssh_public_keys=YC_DP_SSH_PUBLIC_KEY, service_account_id=YC_DP_SA_ID, subnet_id=YC_DP_SUBNET_ID, s3_bucket=YC_DP_LOGS_BUCKET, zone=YC_DP_AZ, cluster_image_version='2.1', masternode_resource_preset='s2.small', masternode_disk_type='network-ssd', masternode_disk_size=200, computenode_resource_preset='m2.large', computenode_disk_type='network-ssd', computenode_disk_size=200, computenode_count=2, computenode_max_hosts_count=5, # The number of data-handling subclusters will automatically scale up if the load is high. services=['YARN', 'SPARK'], # A lightweight cluster is being created. datanode_count=0, # Without data storage subclusters. properties={ # With a reference to a remote Apache Hive™ Metastore cluster. 'spark:spark.hive.metastore.uris': f'thrift://{YC_DP_METASTORE_URI}:9083', }, ) # Step 2: Running a PySpark job poke_spark_processing = DataprocCreatePysparkJobOperator( task_id='dp-cluster-pyspark-task', main_python_file_uri=f's3a://{YC_SOURCE_BUCKET}/scripts/create-table.py', ) # Step 3: Deleting the Yandex Data Processing cluster delete_spark_cluster = DataprocDeleteClusterOperator( task_id='dp-cluster-delete-task', trigger_rule=TriggerRule.ALL_DONE, ) # Building a DAG based on the above steps create_spark_cluster >> poke_spark_processing >> delete_spark_cluster
Where:
YC_DP_AZ
: Availability zone for the Yandex Data Processing cluster.YC_DP_SSH_PUBLIC_KEY
: Public part of the SSH key for the Yandex Data Processing cluster.YC_DP_SUBNET_ID
: Subnet ID.YC_DP_SA_ID
: ID of the service account for Yandex Data Processing.YC_DP_METASTORE_URI
: IP address of the Apache Hive™ Metastore cluster.YC_SOURCE_BUCKET
: Bucket with the Python script for the PySpark job.YC_DP_LOGS_BUCKET
: Bucket for logs.
-
Upload the DAG to the Managed Service for Apache Airflow™ cluster: in
<bucket_for_Managed_Airflow>
, create a folder nameddags
and upload theData-Processing-DAG.py
file to it. -
Open the Apache Airflow™ web interface.
-
Make sure the DAGs section now contains the new DAG named
DATA_INGEST
and tagged asdata-processing-and-airflow
.It may take a few minutes to upload a DAG file from the bucket.
-
To run the DAG, click
in the line with its name.
-
Create an SSH key. Save the public part of the key: you will need it to create a Yandex Data Processing cluster.
-
Create a local file named
Data-Processing-DAG.py
, paste the following script to it and substitute the variables with your infrastructure data:Data-Processing-DAG.py
import uuid import datetime from airflow import DAG from airflow.utils.trigger_rule import TriggerRule from airflow.providers.yandex.operators.yandexcloud_dataproc import ( DataprocCreateClusterOperator, DataprocCreatePysparkJobOperator, DataprocDeleteClusterOperator, ) # Your infrastructure data YC_DP_AZ = 'ru-central1-a' YC_DP_SSH_PUBLIC_KEY = '<public_part_of_SSH_key>' YC_DP_SUBNET_ID = '<subnet_ID>' YC_DP_SA_ID = '<ID_of_my-editor_service_account>' YC_DP_METASTORE_URI = '<IP_address>' YC_BUCKET = '<bucket_for_jobs_and_data>' # DAG settings with DAG( 'DATA_INGEST', schedule_interval='@hourly', tags=['data-processing-and-airflow'], start_date=datetime.datetime.now(), max_active_runs=1, catchup=False ) as ingest_dag: # Step 1: Creating a Yandex Data Proc cluster create_spark_cluster = DataprocCreateClusterOperator( task_id='dp-cluster-create-task', cluster_name=f'tmp-dp-{uuid.uuid4()}', cluster_description='Temporary cluster for running a PySpark job orchestrated by Managed Service for Apache Airflow™', ssh_public_keys=YC_DP_SSH_PUBLIC_KEY, service_account_id=YC_DP_SA_ID, subnet_id=YC_DP_SUBNET_ID, s3_bucket=YC_BUCKET, zone=YC_DP_AZ, cluster_image_version='2.1', masternode_resource_preset='s2.small', masternode_disk_type='network-ssd', masternode_disk_size=200, computenode_resource_preset='m2.large', computenode_disk_type='network-ssd', computenode_disk_size=200, computenode_count=2, computenode_max_hosts_count=5, # The number of data-handling subclusters will automatically scale up if the load is high. services=['YARN', 'SPARK'], # A lightweight cluster is being created. datanode_count=0, # Without data storage subclusters. properties={ # With a reference to a remote Apache Hive™ Metastore cluster. 'spark:spark.hive.metastore.uris': f'thrift://{YC_DP_METASTORE_URI}:9083', }, ) # Step 2: Running a PySpark job poke_spark_processing = DataprocCreatePysparkJobOperator( task_id='dp-cluster-pyspark-task', main_python_file_uri=f's3a://{YC_BUCKET}/scripts/create-table.py', ) # Step 3: Deleting the Yandex Data Processing cluster delete_spark_cluster = DataprocDeleteClusterOperator( task_id='dp-cluster-delete-task', trigger_rule=TriggerRule.ALL_DONE, ) # Building a DAG based on the above steps create_spark_cluster >> poke_spark_processing >> delete_spark_cluster
Where:
YC_DP_AZ
: Availability zone for the Yandex Data Processing cluster.YC_DP_SSH_PUBLIC_KEY
: Public part of the SSH key for the Yandex Data Processing cluster.YC_DP_SUBNET_ID
: Subnet ID.YC_DP_SA_ID
:my-editor
service account ID.YC_DP_METASTORE_URI
: IP address of the Apache Hive™ Metastore cluster.YC_BUCKET
—<bucket_for_jobs_and_data>
.
-
Upload the DAG to the Managed Service for Apache Airflow™ cluster: in
<bucket_for_jobs_and_data>
, create a folder nameddags
and upload theData-Processing-DAG.py
file to it. -
Open the Apache Airflow™ web interface.
-
Make sure the DAGs section now contains the new DAG named
DATA_INGEST
and tagged asdata-processing-and-airflow
.It may take a few minutes to upload a DAG file from the bucket.
-
To run a DAG, click
in the line with its name.
Check the result
- To monitor task execution results, click the DAG name. You can find the results in the Grid tab.
- Wait until the status of all the three tasks in the DAG changes to Success. Simultaneously, you can check that a Yandex Data Processing cluster is being created, the PySpark job is running, and the same cluster is being deleted in the management console
. - Make sure your
<bucket_for_PySpark_job_output_data>
now contains thecountries
folder with thepart-00000-...
file. The data from the created table is now stored in the Object Storage bucket and the table metadata is stored in the Apache Hive™ Metastore cluster. - Make sure there are PySpark job logs in
<bucket_for_collecting_Spark_logs>
.
- To monitor task execution results, click the DAG name. You can find the results in the Grid tab.
- Wait until the status of all the three tasks in the DAG changes to Success. Simultaneously, you can check that an Yandex Data Processing cluster is being created, the PySpark job is running, and the same cluster is being deleted in the management console
. - Make sure
<bucket_for_jobs_and_data>
now contains thecountries
folder with thepart-00000-...
file. The data from the created table is now stored in the Object Storage bucket and the table metadata is stored in the Apache Hive™ Metastore cluster. - Make sure there are PySpark job logs in
<bucket_for_jobs_and_data>
. These logs are written to thedataproc
,user
, andvar
folders.
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them:
- Service accounts.
- Object Storage buckets.
- Apache Hive™ Metastore cluster.
- Managed Service for Apache Airflow™ cluster.
- Route table.
- NAT gateway.
- Security groups.
- Cloud subnets created in
data-processing-network
by default. - Cloud network.
- Service account.
- Object Storage bucket.
- Apache Hive™ Metastore cluster.
- Managed Service for Apache Airflow™ cluster.
- Route table.
- NAT gateway.
- Security group created in
data-processing-network
by default. - Cloud subnets created in
data-processing-network
by default. - Cloud network.