Sending requests to the Yandex Cloud API via the Yandex Cloud Python SDK
When working with Managed Service for Apache Airflow™, you can use the Yandex Cloud Python SDK
As an example, we use a directed acyclic graph (DAG) which submits a request to the Yandex Cloud API. The request returns a list of virtual machines in the folder where the Apache Airflow™ cluster is created.
To use the Yandex Cloud Python SDK to send requests to the Yandex Cloud API:
If you no longer need the resources you created, delete them.
Prepare the infrastructure
-
Create a service account named
airflow-sa
with the following roles:compute.viewer
managed-airflow.integrationProvider
-
Create an Yandex Object Storage bucket in any configuration.
-
Create a Managed Service for Apache Airflow™ cluster with the following parameters:
- Service account:
airflow-sa
- Bucket name: Name of the new bucket
- Service account:
-
Create a VM in any configuration.
Prepare the DAG file and run the graph
-
Create a local file named
test_python_sdk.py
and copy the following script to it:test_python_sdk.py
from airflow import DAG from airflow.models import Connection from airflow.operators.python_operator import PythonOperator from datetime import datetime import yandexcloud from yandex.cloud.compute.v1.instance_service_pb2 import ( ListInstancesRequest, ) from yandex.cloud.compute.v1.instance_service_pb2_grpc import InstanceServiceStub def list_instances(): conn = Connection.get_connection_from_secrets("yandexcloud_default") folder_id = conn.extra_dejson.get('folder_id') sdk = yandexcloud.SDK() instance_service = sdk.client(InstanceServiceStub) response = instance_service.List(ListInstancesRequest(folder_id=folder_id)) print("instances: ", response.instances) with DAG( dag_id='test_python_sdk', start_date=datetime(2024, 5, 24), schedule="@once", catchup=False, ) as dag: PythonOperator( task_id='list_instances', python_callable=list_instances, )
To authenticate in the cloud, the IAM token of the service account attached to the Apache Airflow™ cluster is used. The
yandexcloud.SDK()
object is created with default parameters and automatically populated with the data required to authenticate with the IAM token. -
Upload the
test_python_sdk.py
DAG file to the bucket you created earlier. This will automatically create a graph with the same name in the Apache Airflow™ web interface. -
Make sure a new graph named
test_python_sdk
has appeared in the DAGs section.It may take a few minutes to upload a DAG file from the bucket.
-
To run the graph, click in the line with its name.
Check the result
To check the result in the Apache Airflow™ web interface:
- In the DAGs section, open the
test_python_sdk
graph. - Go to the Grid section.
- Select the list_instances task.
- Go to Logs.
- Make sure the logs list the virtual machines from the folder where the Apache Airflow™ cluster is created. This means the query was successful.
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them: