Sending requests to the Yandex Cloud API via the Yandex Cloud Python SDK
Warning
This tutorial was tested on clusters with the Apache Airflow™ version below 3.0.
When working with Managed Service for Apache Airflow™, you can use the Yandex Cloud Python SDK
As an example, we use a directed acyclic graph (DAG) to send a request to the Yandex Cloud API. The request returns a list of virtual machines in the folder where the Apache Airflow™ cluster was created.
To use the Yandex Cloud Python SDK to send requests to the Yandex Cloud API:
If you no longer need the resources you created, delete them.
Required paid resources
The support cost for this solution includes:
- Managed Service for Apache Airflow™ cluster fee: computing resources of the cluster components and the amount of outgoing traffic (see Apache Airflow™ pricing).
- Fee for public IP addresses if public access is enabled for cluster hosts (see Virtual Private Cloud pricing).
- Object Storage bucket fee: Covers data storage and bucket operations (see Object Storage pricing).
- VM fee: using computing resources, storage, OS (for specific operating systems), and, optionally, public IP address (see Compute Cloud pricing).
Set up your infrastructure
-
Create a service account named
airflow-sawith the following roles:compute.viewermanaged-airflow.integrationProvider
-
Create a Yandex Object Storage bucket in any configuration.
-
Edit the ACL of the new bucket to give the
READpermission to theairflow-saservice account. -
Create a Managed Service for Apache Airflow™ cluster with the following parameters:
- Service account:
airflow-sa - Bucket name: Name of the new bucket
- Service account:
-
Create a VM in any configuration.
Prepare the DAG file and run the graph
-
Create a local file named
test_python_sdk.pyand paste the following script to it:test_python_sdk.py
from airflow import DAG from airflow.models import Connection from airflow.operators.python_operator import PythonOperator from datetime import datetime import yandexcloud from yandex.cloud.compute.v1.instance_service_pb2 import ( ListInstancesRequest, ) from yandex.cloud.compute.v1.instance_service_pb2_grpc import InstanceServiceStub def list_instances(): conn = Connection.get_connection_from_secrets("yandexcloud_default") folder_id = conn.extra_dejson.get('folder_id') sdk = yandexcloud.SDK() instance_service = sdk.client(InstanceServiceStub) response = instance_service.List(ListInstancesRequest(folder_id=folder_id)) print("instances: ", response.instances) with DAG( dag_id='test_python_sdk', start_date=datetime(2024, 5, 24), schedule="@once", catchup=False, ) as dag: PythonOperator( task_id='list_instances', python_callable=list_instances, )You get authenticated in the cloud using the IAM token of the service account attached to the Apache Airflow™ cluster. The
yandexcloud.SDK()object with default parameters is automatically populated with the data required to get authenticated using the IAM token. -
Upload the
test_python_sdk.pyDAG file to the bucket you created earlier. This will automatically create a graph with the same name in the Apache Airflow™ web interface. -
Make sure a new graph named
test_python_sdkhas appeared in the DAGs section.It may take a few minutes to load a DAG file from the bucket.
-
To run the graph, click
in the line with its name.
Check the result
To check the result in the Apache Airflow™ web interface:
- In the DAGs section, open the
test_python_sdkgraph. - Go to the Grid section.
- Select list_instances.
- Go to Logs.
- Make sure the logs list the virtual machines from the folder where the Apache Airflow™ cluster was created. This means the query was successful.
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them: