Подключение к Yandex Managed Service for PostgreSQL
С помощью направленного ациклического графа (DAG) можно настроить подключение к БД в кластере Yandex Managed Service for PostgreSQL. Данные для подключения к БД хранятся в Yandex Lockbox и автоматически подставляются в граф.
Совет
В кластерах с версией Apache Airflow™ ниже 3.0 по умолчанию используется провайдер apache-airflow-providers-postgres версии 5.13.1. Если вы работаете с более новой версией провайдера, вместо PostgresOperator используйте SQLExecuteQueryOperator. Подробнее см. в официальной документации
Перед началом работы
-
Создайте кластер Managed Service for PostgreSQL с параметрами:
- Имя БД —
db1; - Имя пользователя —
user1; - Пароль —
user1-password.
- Имя БД —
-
Создайте бакет Yandex Object Storage, в котором будет храниться DAG-файл.
-
Настройте кластер Managed Service for Apache Airflow™:
-
Включите опцию Использовать Lockbox Secret Backend, которая позволяет использовать секреты в сервисе Yandex Lockbox для хранения конфигурационных данных, переменных и параметров подключений Apache Airflow™.
-
В блоке Зависимости добавьте pip-пакет
apache-airflow-providers-postgres.Важно
Установка pip-пакета необходима для кластеров с версией Apache Airflow™ 3.0 и выше. В кластерах с версией Apache Airflow™ ниже 3.0 pip-пакет установлен по умолчанию.
-
В блоке Хранилище DAG-файлов выберите созданный ранее бакет Object Storage. Из него будет загружен DAG-файл.
-
-
Выдайте своему сервисному аккаунту роль
lockbox.payloadViewer.Роль
lockbox.payloadViewerне обязательно выдавать на весь каталог. Достаточно назначить ее на конкретный секрет Yandex Lockbox после его создания.
Создайте секрет Yandex Lockbox
Для корректной работы кластера Apache Airflow™ секрет в Yandex Lockbox должен иметь имя в формате airflow/<тип_артефакта>/<идентификатор_артефакта>, где:
<тип_артефакта>— определяет, какие данные будут храниться в секрете. Возможные значения:connections— подключения;variables— переменные;config— данные конфигурации.
<идентификатор_артефакта>— идентификатор, который будет использоваться для обращения к артефакту в Apache Airflow™.
Создайте секрет Yandex Lockbox с параметрами:
-
Имя —
airflow/connections/pg1. -
Тип секрета —
Пользовательский. -
Ключ —
conn. -
Значение — выберите Текст и укажите следующее содержимое:
{ "conn_type": "postgres", "host": "<FQDN_хоста_кластера_PostgreSQL>", "port": 6432, "schema": "db1", "login": "user1", "password": "user1-password" }
Подробнее о том, как узнать FQDN хоста кластера PostgreSQL, см. в разделе FQDN хоста PostgreSQL.
В секрете будут сохранены данные для подключения к БД в кластере Managed Service for PostgreSQL.
Подготовьте DAG-файл и запустите граф
-
Создайте локально файл с именем
postgres_operator.pyи скопируйте в него скрипт:Версия Apache Airflow™ ниже 3.0Версия Apache Airflow™ 3.0 и вышеfrom airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator with DAG( dag_id='postgres_operator', schedule=None ): PostgresOperator( task_id="check_conn", postgres_conn_id='pg1', sql="SELECT 1;" )from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator with DAG( dag_id='postgres_operator', schedule=None ): SQLExecuteQueryOperator( task_id="check_conn", conn_id='pg1', sql="SELECT 1;" ) -
Загрузите DAG-файл
postgres_operator.pyв созданный ранее бакет. -
Убедитесь, что в разделе Dags появился новый граф
postgres_operator.Загрузка DAG-файла из бакета может занять несколько минут.
-
Чтобы запустить граф, в строке с его именем нажмите кнопку
.
Проверьте результат
Чтобы проверить результат в веб-интерфейсе Apache Airflow™:
- В разделе Dags нажмите на граф
postgres_operator. - Перейдите в раздел Graph.
- Выберите задание check_conn.
- Перейдите в раздел Logs.
- Убедитесь, что в логах есть строка
Rows affected: 1. Это значит, что запрос выполнен успешно.
- В разделе Dags нажмите на граф
postgres_operator. - Перейдите в раздел Tasks.
- Выберите задание check_conn.
- Перейдите в раздел Task Instances.
- Выберите экземпляр задания.
- Откроется раздел Logs.
- Убедитесь, что в логах есть строка
Rows affected: 1. Это значит, что запрос выполнен успешно.