Sending requests to the Yandex Cloud API via the Yandex Cloud Python SDK
When working with Managed Service for Apache Airflow™, you can use the Yandex Cloud Python SDK
As an example, we use a directed acyclic graph (DAG) to send a request to the Yandex Cloud API. The request returns a list of virtual machines in the folder where the Apache Airflow™ cluster was created.
To use the Yandex Cloud Python SDK to send requests to the Yandex Cloud API:
If you no longer need the resources you created, delete them.
Required paid resources
The support cost includes:
- Managed Service for Apache Airflow™ cluster fee: computing resources of the cluster components and the amount of outgoing traffic (see Apache Airflow™ pricing).
- Fee for using public IP addresses if public access is enabled for cluster hosts (see Virtual Private Cloud pricing).
- Fee for an Object Storage bucket: data storage and operations with it (see Object Storage pricing).
- VM fee: using computing resources, storage, OS (for specific operating systems), and, optionally, public IP address (see Compute Cloud pricing).
Set up your infrastructure
-
Create a service account named
airflow-sa
with the following roles:compute.viewer
managed-airflow.integrationProvider
-
Create a Yandex Object Storage bucket in any configuration.
-
Edit the ACL of the new bucket to give the
READ
permission to theairflow-sa
service account. -
Create a Managed Service for Apache Airflow™ cluster with the following parameters:
- Service account:
airflow-sa
- Bucket name: Name of the new bucket
- Service account:
-
Create a VM in any configuration.
Prepare the DAG file and run the graph
-
Create a local file named
test_python_sdk.py
and paste the following script to it:test_python_sdk.py
from airflow import DAG from airflow.models import Connection from airflow.operators.python_operator import PythonOperator from datetime import datetime import yandexcloud from yandex.cloud.compute.v1.instance_service_pb2 import ( ListInstancesRequest, ) from yandex.cloud.compute.v1.instance_service_pb2_grpc import InstanceServiceStub def list_instances(): conn = Connection.get_connection_from_secrets("yandexcloud_default") folder_id = conn.extra_dejson.get('folder_id') sdk = yandexcloud.SDK() instance_service = sdk.client(InstanceServiceStub) response = instance_service.List(ListInstancesRequest(folder_id=folder_id)) print("instances: ", response.instances) with DAG( dag_id='test_python_sdk', start_date=datetime(2024, 5, 24), schedule="@once", catchup=False, ) as dag: PythonOperator( task_id='list_instances', python_callable=list_instances, )
You get authenticated in the cloud using the IAM token of the service account attached to the Apache Airflow™ cluster. The
yandexcloud.SDK()
object with default parameters is automatically populated with the data required to get authenticated using the IAM token. -
Upload the
test_python_sdk.py
DAG file to the bucket you created earlier. This will automatically create a graph with the same name in the Apache Airflow™ web interface. -
Make sure a new graph named
test_python_sdk
has appeared in the DAGs section.It may take a few minutes to load a DAG file from the bucket.
-
To run the graph, click
in the line with its name.
Check the result
To check the result in the Apache Airflow™ web interface:
- In the DAGs section, open the
test_python_sdk
graph. - Go to the Grid section.
- Select list_instances.
- Go to Logs.
- Make sure the logs list the virtual machines from the folder where the Apache Airflow™ cluster was created. This means the query was successful.
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them: