Хранение подключений Apache Airflow™ в Yandex Lockbox
При работе с Yandex Managed Service for Apache Airflow™ вы можете использовать Yandex Lockbox для хранения артефактов, которые могут использоваться в DAG-файлах: подключений, переменных и конфигурационных данных. Yandex Lockbox интегрируется в Managed Service for Apache Airflow™ через провайдер Yandex Lockbox Secret Backend
С помощью направленного ациклического графа (DAG) можно выполнить загрузку подключения из Yandex Lockbox и SQL-запрос SELECT 1; к БД в кластере Yandex Managed Service for PostgreSQL. Данные для подключения к БД хранятся в Yandex Lockbox и автоматически подставляются в граф.
Совет
В кластерах с версией Apache Airflow™ ниже 3.0 по умолчанию используется провайдер apache-airflow-providers-postgres версии 5.13.1. Если вы работаете с более новой версией провайдера, вместо PostgresOperator используйте SQLExecuteQueryOperator. Подробнее см. в официальной документации
Перед началом работы
-
Создайте кластер Managed Service for PostgreSQL с параметрами:
- Имя БД —
db1; - Имя пользователя —
user1; - Пароль —
user1-password.
- Имя БД —
-
Создайте бакет Yandex Object Storage, в котором будет храниться DAG-файл.
-
Настройте кластер Managed Service for Apache Airflow™:
-
Включите опцию Использовать Lockbox Secret Backend, которая позволяет использовать секреты в сервисе Yandex Lockbox для хранения конфигурационных данных, переменных и параметров подключений Apache Airflow™.
-
В блоке Зависимости добавьте pip-пакет
apache-airflow-providers-postgres.Важно
Установка pip-пакета необходима для кластеров с версией Apache Airflow™ 3.0 и выше. В кластерах с версией Apache Airflow™ ниже 3.0 pip-пакет установлен по умолчанию.
-
В блоке Хранилище DAG-файлов выберите созданный ранее бакет Object Storage. Из него будет загружен DAG-файл.
-
-
Выдайте своему сервисному аккаунту роль
lockbox.payloadViewer.Роль
lockbox.payloadViewerне обязательно выдавать на весь каталог. Достаточно назначить ее на конкретный секрет Yandex Lockbox после его создания. -
Выдайте своему сервисному аккаунту роль
lockbox.payloadViewer.Роль
lockbox.payloadViewerне обязательно выдавать на весь каталог. Достаточно назначить ее на конкретный секрет Yandex Lockbox после его создания.
Создайте секрет Yandex Lockbox
Для корректной работы кластера Apache Airflow™ секрет в Yandex Lockbox должен иметь имя в формате airflow/<тип_артефакта>/<идентификатор_артефакта>, где:
<тип_артефакта>— определяет, какие данные будут храниться в секрете. Возможные значения:connections— подключения;variables— переменные;config— данные конфигурации.
<идентификатор_артефакта>— идентификатор, который будет использоваться для обращения к артефакту в Apache Airflow™.
Создайте секрет Yandex Lockbox с параметрами:
-
Имя —
airflow/connections/pg1. -
Тип секрета —
Пользовательский. -
Ключ —
conn. -
Значение — выберите Текст и укажите следующее содержимое:
{ "conn_type": "postgres", "host": "<FQDN_хоста_кластера_PostgreSQL>", "port": 6432, "schema": "db1", "login": "user1", "password": "user1-password" }
В секрете будут сохранены данные для подключения к БД в кластере Managed Service for PostgreSQL.
Подробнее о том, как узнать FQDN хоста кластера PostgreSQL, см. в разделе FQDN хоста PostgreSQL.
Подготовьте DAG-файл и запустите граф
-
Создайте локально файл с именем
test_lockbox_connection.pyи скопируйте в него скрипт:Версия Apache Airflow™ ниже 3.0Версия Apache Airflow™ 3.0 и вышеfrom airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator from datetime import datetime with DAG( dag_id='test_lockbox_connection', start_date=datetime(2024, 4, 19), schedule="@once", catchup=False, ) as dag: check_conn = PostgresOperator( task_id="check_conn", postgres_conn_id='pg1', sql="SELECT 1;", )from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from datetime import datetime with DAG( dag_id='test_lockbox_connection', start_date=datetime(2024, 4, 19), schedule="@once", catchup=False, ) as dag: check_conn = SQLExecuteQueryOperator( task_id="check_conn", conn_id='pg1', sql="SELECT 1;", ) -
Загрузите DAG-файл
test_lockbox_connection.pyв созданный ранее бакет. В результате одноименный граф появится в веб-интерфейсе Apache Airflow™ автоматически. -
Убедитесь, что в разделе Dags появился новый граф
test_lockbox_connection.Загрузка DAG-файла из бакета может занять несколько минут.
-
Чтобы запустить граф, в строке с его именем нажмите кнопку
.
Проверьте результат
Чтобы проверить результат в веб-интерфейсе Apache Airflow™:
- В разделе DAGs нажмите на граф
test_lockbox_connection. - Перейдите в раздел Graph.
- Выберите задание check_conn.
- Перейдите в раздел Logs.
- Убедитесь, что в логах есть строка
Rows affected: 1. Это значит, что запрос выполнен успешно.
- В разделе Dags нажмите на граф
test_lockbox_connection. - Перейдите в раздел Tasks.
- Выберите задание check_conn.
- Перейдите в раздел Task Instances.
- Выберите экземпляр задания.
- Откроется раздел Logs.
- Убедитесь, что в логах есть строка
Rows affected: 1. Это значит, что запрос выполнен успешно.