Отправка запросов к API Yandex Cloud через Yandex Cloud Python SDK
При работе с Managed Service for Apache Airflow™ вы можете использовать Yandex Cloud Python SDK
Ниже рассматривается направленный ациклический граф (DAG), отправляющий запрос к API Yandex Cloud. Запрос возвращает список виртуальных машин в каталоге, в котором создан кластер Apache Airflow™.
Чтобы использовать Yandex Cloud Python SDK для отправки запросов к API Yandex Cloud:
Если созданные ресурсы вам больше не нужны, удалите их.
Подготовьте инфраструктуру
-
Создайте сервисный аккаунт
airflow-sa
с ролями:compute.viewer
;managed-airflow.integrationProvider
.
-
Создайте бакет Yandex Object Storage с произвольными настройками.
-
Отредактируйте ACL созданного бакета так, чтобы у сервисного аккаунта
airflow-sa
было разрешениеREAD
. -
Создайте кластер Managed Service for Apache Airflow™ с параметрами:
- Сервисный аккаунт —
airflow-sa
. - Имя бакета — имя созданного бакета.
- Сервисный аккаунт —
-
Создайте виртуальную машину с произвольными настройками.
Подготовьте DAG-файл и запустите граф
-
Создайте локально файл с именем
test_python_sdk.py
и скопируйте в него скрипт:test_python_sdk.py
from airflow import DAG from airflow.models import Connection from airflow.operators.python_operator import PythonOperator from datetime import datetime import yandexcloud from yandex.cloud.compute.v1.instance_service_pb2 import ( ListInstancesRequest, ) from yandex.cloud.compute.v1.instance_service_pb2_grpc import InstanceServiceStub def list_instances(): conn = Connection.get_connection_from_secrets("yandexcloud_default") folder_id = conn.extra_dejson.get('folder_id') sdk = yandexcloud.SDK() instance_service = sdk.client(InstanceServiceStub) response = instance_service.List(ListInstancesRequest(folder_id=folder_id)) print("instances: ", response.instances) with DAG( dag_id='test_python_sdk', start_date=datetime(2024, 5, 24), schedule="@once", catchup=False, ) as dag: PythonOperator( task_id='list_instances', python_callable=list_instances, )
Аутентификация в облаке происходит через IAM-токен сервисного аккаунта, который привязан к кластеру Apache Airflow™. В созданном с параметрами по умолчанию объекте
yandexcloud.SDK()
данные для аутентификации через IAM-токен подставляются автоматически. -
Загрузите DAG-файл
test_python_sdk.py
в созданный ранее бакет. В результате одноименный граф появится в веб-интерфейсе Apache Airflow™ автоматически. -
Убедитесь, что в разделе DAGs появился новый граф
test_python_sdk
.Загрузка DAG-файла из бакета может занять несколько минут.
-
Чтобы запустить граф, в строке с его именем нажмите кнопку .
Проверьте результат
Чтобы проверить результат в веб-интерфейсе Apache Airflow™:
- В разделе DAGs откройте граф
test_python_sdk
. - Перейдите в раздел Grid.
- Выберите задание list_instances.
- Перейдите в раздел Logs.
- Убедитесь, что в логах перечислены виртуальные машины из каталога, в котором создан кластер Apache Airflow™. Это значит, что запрос выполнен успешно.
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них: