Sending requests to the Yandex Cloud API via the Yandex Cloud Python SDK
When working with Managed Service for Apache Airflow™, you can use the Yandex Cloud Python SDK
As an example, we use a directed acyclic graph (DAG) which submits a request to the Yandex Cloud API. The request returns a list of virtual machines in the folder where the Apache Airflow™ cluster is created.
To use the Yandex Cloud Python SDK to send requests to the Yandex Cloud API:
If you no longer need the resources you created, delete them.
Required paid resources
The support cost includes:
- Managed Service for Apache Airflow™ cluster fee: Computing resources of the cluster components and the amount of outgoing traffic (see Apache Airflow™ pricing).
- Fee for using public IP addresses if public access is enabled for cluster hosts (see Virtual Private Cloud pricing).
- Object Storage bucket fee: Storing data and performing operations with it (see Object Storage pricing).
- VM fee: Using computing resources, storage, OS (for specific operating systems), and, optionally, public IP address (see Compute Cloud pricing).
Set up your infrastructure
-
Create a service account named
airflow-sa
with the following roles:compute.viewer
managed-airflow.integrationProvider
-
Create an Yandex Object Storage bucket in any configuration.
-
Edit the ACL of the new bucket to give the
READ
permission to theairflow-sa
service account. -
Create a Managed Service for Apache Airflow™ cluster with the following parameters:
- Service account:
airflow-sa
- Bucket name: Name of the new bucket
- Service account:
-
Create a VM in any configuration.
Prepare the DAG file and run the graph
-
Create a local file named
test_python_sdk.py
and copy the following script to it:test_python_sdk.py
from airflow import DAG from airflow.models import Connection from airflow.operators.python_operator import PythonOperator from datetime import datetime import yandexcloud from yandex.cloud.compute.v1.instance_service_pb2 import ( ListInstancesRequest, ) from yandex.cloud.compute.v1.instance_service_pb2_grpc import InstanceServiceStub def list_instances(): conn = Connection.get_connection_from_secrets("yandexcloud_default") folder_id = conn.extra_dejson.get('folder_id') sdk = yandexcloud.SDK() instance_service = sdk.client(InstanceServiceStub) response = instance_service.List(ListInstancesRequest(folder_id=folder_id)) print("instances: ", response.instances) with DAG( dag_id='test_python_sdk', start_date=datetime(2024, 5, 24), schedule="@once", catchup=False, ) as dag: PythonOperator( task_id='list_instances', python_callable=list_instances, )
To authenticate in the cloud, the IAM token of the service account attached to the Apache Airflow™ cluster is used. The
yandexcloud.SDK()
object is created with default parameters and automatically populated with the data required to authenticate with the IAM token. -
Upload the
test_python_sdk.py
DAG file to the bucket you created earlier. This will automatically create a graph with the same name in the Apache Airflow™ web interface. -
Make sure a new graph named
test_python_sdk
has appeared in the DAGs section.It may take a few minutes to upload a DAG file from the bucket.
-
To run the graph, click
in the line with its name.
Check the result
To check the result in the Apache Airflow™ web interface:
- In the DAGs section, open the
test_python_sdk
graph. - Go to the Grid section.
- Select the list_instances task.
- Go to Logs.
- Make sure the logs list the virtual machines from the folder where the Apache Airflow™ cluster is created. This means the query was successful.
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them: