Автоматизация работы с Yandex Data Processing с помощью Yandex Managed Service for Apache Airflow™
Важно
Руководство тестировалось на кластерах с версией Apache Airflow™ ниже 3.0.
В сервисе Yandex Managed Service for Apache Airflow™ можно создать DAG — направленный ациклический граф задач, который позволит автоматизировать работу с сервисом Yandex Data Processing. Ниже рассматривается DAG, который включает в себя несколько задач:
- Создать кластер Yandex Data Processing.
- Создать и запустить задание PySpark.
- Удалить кластер Yandex Data Processing.
При таком DAG кластер существует непродолжительное время. Так как стоимость ресурсов Yandex Data Processing зависит от времени их использования, в кластере можно задействовать ресурсы повышенной мощности и быстро обработать большее количество данных за те же деньги.
В этом DAG кластер Yandex Data Processing создается без сервиса Hive. Для хранения табличных метаданных в примере ниже используется кластер Apache Hive™ Metastore. Сохраненные метаданные затем может использовать другой кластер Yandex Data Processing.
Чтобы автоматизировать работу с Yandex Data Processing с помощью Managed Service for Apache Airflow™:
- Подготовьте инфраструктуру.
- Подготовьте PySpark-задание.
- Подготовьте и запустите DAG-файл.
- Проверьте результат.
Если созданные ресурсы вам больше не нужны, удалите их.
Необходимые платные ресурсы
В стоимость поддержки описываемого решения входят:
- Плата за кластер Managed Service for Apache Airflow™: вычислительные ресурсы компонентов кластера (см. тарифы Apache Airflow™).
- Плата за вычислительные ресурсы кластера Apache Hive™ Metastore (см. тарифы Yandex MetaData Hub).
- Плата за NAT-шлюз (см. тарифы Virtual Private Cloud).
- Плата за бакеты Object Storage: хранение данных и выполнение операций с ними (см. тарифы Object Storage).
- Плата за кластер Yandex Data Processing: использование вычислительных ресурсов ВМ и сетевых дисков Compute Cloud, а также сервиса Cloud Logging для работы с логами (см. тарифы Yandex Data Processing).
Подготовьте инфраструктуру
В примере ниже рассматривается два сценария. Выберите наиболее подходящий:
-
Высокий уровень безопасности. Такой сценарий рекомендуемый, так как в нем соблюдается принцип минимальных привилегий. Сценарий включает в себя следующие особенности:
- Права доступа разделяются между сервисными аккаунтами. Для каждого кластера вы создаете отдельный сервисный аккаунт и назначаете ему роли, необходимые только для работы кластера этого аккаунта.
- Используется несколько бакетов для разных задач, различные данные хранятся в разных бакетах. Например, результаты выполнения PySpark-задания записываются в один бакет, а логи — в другой.
- Настроены группы безопасности. Вы ограничиваете трафик, и в результате доступ получают только разрешенные ресурсы.
-
Упрощенная настройка. Предусматривает более низкий уровень безопасности:
- Используется один сервисный аккаунт, который обладает большими привилегиями, чем необходимо.
- Все данные хранятся только в одном бакете, но в разных папках.
- Группы безопасности не настраиваются.
Подготовьте инфраструктуру:
-
Создайте сервисные аккаунты со следующими ролями:
Сервисный аккаунт
Его роли
airflow-agentдля кластера Apache Airflow™.- dataproc.editor — чтобы управлять кластером Yandex Data Processing из DAG;
- vpc.user — чтобы в кластере Apache Airflow™ использовать подсеть Yandex Virtual Private Cloud;
- managed-airflow.integrationProvider — чтобы кластер Apache Airflow™ мог взаимодействовать с другими ресурсами;
- iam.serviceAccounts.user — чтобы указать сервисный аккаунт
data-processing-agentпри создании кластера Yandex Data Processing.
metastore-agentдля кластера Apache Hive™ Metastore.- managed-metastore.integrationProvider — чтобы кластер Apache Hive™ Metastore мог взаимодействовать с другими ресурсами.
data-processing-agentдля кластера Yandex Data Processing.- dataproc.agent — чтобы сервисный аккаунт мог получать информацию о состоянии хостов кластера, заданиях и лог-группах.
- dataproc.provisioner — чтобы сервисный аккаунт мог взаимодействовать с автоматически масштабируемой группой ВМ. Тогда будет доступно автомасштабирование подкластеров.
-
<бакет_для_Managed_Airflow>;<бакет_для_исходного_кода_PySpark_задания>;<бакет_для_выходных_данных_PySpark_задания>;<бакет_для_сбора_логов_Spark>.
Бакетов нужно несколько, так как на них назначаются различные права доступа.
-
Предоставьте разрешения на следующие бакеты:
<бакет_для_Managed_Airflow>— разрешениеREADдля сервисного аккаунтаairflow-agent;<бакет_для_исходного_кода_PySpark_задания>— разрешениеREADдля сервисного аккаунтаdata-processing-agent;<бакет_для_выходных_данных_PySpark_задания>— разрешениеREAD и WRITEдля сервисных аккаунтовdata-processing-agentиmetastore-agent;<бакет_для_сбора_логов_Spark>— разрешениеREAD и WRITEдля сервисного аккаунтаdata-processing-agent.
-
Создайте облачную сеть с именем
data-processing-network.Вместе с ней автоматически создадутся три подсети в разных зонах доступности.
-
Настройте NAT-шлюз для подсети
data-processing-network-ru-central1-a. -
Для кластера Apache Hive™ Metastore создайте группу безопасности
metastore-sgв сетиdata-processing-network. Добавьте в группу следующие правила:-
Для входящего трафика от клиентов:
- диапазон портов —
30000-32767; - протокол —
Любой(Any); - источник —
CIDR; - CIDR блоки —
0.0.0.0/0.
- диапазон портов —
-
Для входящего трафика от балансировщика:
- диапазон портов —
10256; - протокол —
Любой(Any); - источник —
Проверки состояния балансировщика.
- диапазон портов —
-
-
Для кластеров Managed Service for Apache Airflow™ и Yandex Data Processing создайте группу безопасности
airflow-sgв сетиdata-processing-network. Добавьте в группу следующие правила:-
Для входящего служебного трафика:
- диапазон портов —
0-65535; - протокол —
Любой(Any); - источник —
Группа безопасности; - группа безопасности —
Текущая(Self).
- диапазон портов —
-
Для исходящего служебного трафика:
- диапазон портов —
0-65535; - протокол —
Любой(Any); - назначение —
Группа безопасности; - группа безопасности —
Текущая(Self).
- диапазон портов —
-
Для исходящего HTTPS-трафика:
- диапазон портов —
443; - протокол —
TCP; - назначение —
CIDR; - CIDR блоки —
0.0.0.0/0.
- диапазон портов —
-
Для исходящего трафика, чтобы разрешить подключение кластера Yandex Data Processing к Apache Hive™ Metastore:
- диапазон портов —
9083; - протокол —
Любой(Any); - назначение —
Группа безопасности; - группа безопасности —
metastore-sg(Из списка).
- диапазон портов —
-
-
Создайте кластер Apache Hive™ Metastore с параметрами:
- Сервисный аккаунт —
metastore-agent. - Сеть —
data-processing-network. - Подсеть —
data-processing-network-ru-central1-a. - Группа безопасности —
metastore-sg.
- Сервисный аккаунт —
-
Создайте кластер Managed Service for Apache Airflow™ с параметрами:
- Сервисный аккаунт —
airflow-agent. - Зона доступности —
ru-central1-a. - Сеть —
data-processing-network. - Подсеть —
data-processing-network-ru-central1-a. - Группа безопасности —
airflow-sg. - Имя бакета —
<бакет_для_Managed_Airflow>.
- Сервисный аккаунт —
Подготовьте инфраструктуру:
-
Создайте сервисный аккаунт
my-editorсо следующими ролями:- dataproc.editor — для управления кластером Yandex Data Processing из DAG;
- editor — для остальных необходимых операций.
-
Создайте бакет
<бакет_для_заданий_и_данных>.На него не нужно предоставлять разрешение сервисному аккаунту, так как роли
editorдостаточно. -
Создайте облачную сеть с именем
data-processing-network.Вместе с ней автоматически создадутся три подсети в разных зонах доступности и группа безопасности.
-
Настройте NAT-шлюз для подсети
data-processing-network-ru-central1-a. -
Создайте кластер Apache Hive™ Metastore с параметрами:
- Сервисный аккаунт —
my-editor. - Сеть —
data-processing-network. - Подсеть —
data-processing-network-ru-central1-a. - Группа безопасности — группа по умолчанию в сети
data-processing-network.
- Сервисный аккаунт —
-
Создайте кластер Managed Service for Apache Airflow™ с параметрами:
- Сервисный аккаунт —
my-editor. - Зона доступности —
ru-central1-a. - Сеть —
data-processing-network. - Подсеть —
data-processing-network-ru-central1-a. - Группа безопасности — группа по умолчанию в сети
data-processing-network. - Имя бакета —
<бакет_для_заданий_и_данных>.
- Сервисный аккаунт —
Подготовьте PySpark-задание
Для PySpark-задания будет использован Python-скрипт, который создает таблицу и хранится в бакете Object Storage. Подготовьте файл скрипта:
-
Создайте локально файл с именем
create-table.pyи скопируйте в него скрипт:create-table.py
from pyspark.sql.types import * from pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder \ .appName("create-table") \ .enableHiveSupport() \ .getOrCreate() # Создание схемы данных schema = StructType([StructField('Name', StringType(), True), StructField('Capital', StringType(), True), StructField('Area', IntegerType(), True), StructField('Population', IntegerType(), True)]) # Создание датафрейма df = spark.createDataFrame([('Австралия', 'Канберра', 7686850, 19731984), ('Австрия', 'Вена', 83855, 7700000)], schema) # Запись датафрейма в бакет в виде таблицы countries df.write.mode("overwrite").option("path","s3a://<бакет_для_выходных_данных_PySpark_задания>/countries").saveAsTable("countries") -
Создайте в бакете
<бакет_для_исходного_кода_PySpark_задания>папкуscriptsи загрузите в нее файлcreate-table.py.
-
Создайте локально файл с именем
create-table.pyи скопируйте в него скрипт:create-table.py
from pyspark.sql.types import * from pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder \ .appName("create-table") \ .enableHiveSupport() \ .getOrCreate() # Создание схемы данных schema = StructType([StructField('Name', StringType(), True), StructField('Capital', StringType(), True), StructField('Area', IntegerType(), True), StructField('Population', IntegerType(), True)]) # Создание датафрейма df = spark.createDataFrame([('Австралия', 'Канберра', 7686850, 19731984), ('Австрия', 'Вена', 83855, 7700000)], schema) # Запись датафрейма в бакет в виде таблицы countries df.write.mode("overwrite").option("path","s3a://<бакет_для_заданий_и_данных>/countries").saveAsTable("countries") -
Создайте в бакете
<бакет_для_заданий_и_данных>папкуscriptsи загрузите в нее файлcreate-table.py.
Подготовьте и запустите DAG-файл
DAG будет состоять из нескольких вершин, которые формируют цепочку последовательных действий:
- Managed Service for Apache Airflow™ создает временный, легковесный кластер Yandex Data Processing с настройками, заданными в DAG. Этот кластер автоматически подключается к созданному ранее кластеру Apache Hive™ Metastore.
- Когда кластер Yandex Data Processing готов, запускается задание PySpark.
- После выполнения задания временный кластер Yandex Data Processing удаляется.
Чтобы подготовить DAG:
-
Создайте SSH-ключ. Сохраните открытую часть ключа — она понадобится для создания кластера Yandex Data Processing.
-
Создайте локально файл с именем
Data-Processing-DAG.py, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:Data-Processing-DAG.py
import uuid import datetime from airflow import DAG from airflow.utils.trigger_rule import TriggerRule from airflow.providers.yandex.operators.yandexcloud_dataproc import ( DataprocCreateClusterOperator, DataprocCreatePysparkJobOperator, DataprocDeleteClusterOperator, ) # Данные вашей инфраструктуры YC_DP_AZ = 'ru-central1-a' YC_DP_SSH_PUBLIC_KEY = '<открытая_часть_SSH-ключа>' YC_DP_SUBNET_ID = '<идентификатор_подсети>' YC_DP_SA_ID = '<идентификатор_сервисного_аккаунта_data-processing-agent>' YC_DP_METASTORE_URI = '<IP-адрес>' YC_SOURCE_BUCKET = '<бакет_для_исходного_кода_PySpark_задания>' YC_DP_LOGS_BUCKET = '<бакет_для_сбора_логов_Spark>' # Настройки DAG with DAG( 'DATA_INGEST', schedule='@hourly', tags=['data-processing-and-airflow'], start_date=datetime.datetime.now(), max_active_runs=1, catchup=False ) as ingest_dag: # 1 этап: создание кластера Yandex Data Proc create_spark_cluster = DataprocCreateClusterOperator( task_id='dp-cluster-create-task', cluster_name=f'tmp-dp-{uuid.uuid4()}', cluster_description='Временный кластер для выполнения PySpark-задания под оркестрацией Managed Service for Apache Airflow™', ssh_public_keys=YC_DP_SSH_PUBLIC_KEY, service_account_id=YC_DP_SA_ID, subnet_id=YC_DP_SUBNET_ID, s3_bucket=YC_DP_LOGS_BUCKET, zone=YC_DP_AZ, cluster_image_version='2.1', masternode_resource_preset='s2.small', masternode_disk_type='network-ssd', masternode_disk_size=200, computenode_resource_preset='m2.large', computenode_disk_type='network-ssd', computenode_disk_size=200, computenode_count=2, computenode_max_hosts_count=5, # Количество подкластеров для обработки данных будет автоматически масштабироваться в случае большой нагрузки. services=['YARN', 'SPARK'], # Создается легковесный кластер. datanode_count=0, # Без подкластеров для хранения данных. properties={ # С указанием на удаленный кластер Apache Hive™ Metastore. 'spark:spark.hive.metastore.uris': f'thrift://{YC_DP_METASTORE_URI}:9083', }, ) # 2 этап: запуск задания PySpark poke_spark_processing = DataprocCreatePysparkJobOperator( task_id='dp-cluster-pyspark-task', main_python_file_uri=f's3a://{YC_SOURCE_BUCKET}/scripts/create-table.py', ) # 3 этап: удаление кластера Yandex Data Processing delete_spark_cluster = DataprocDeleteClusterOperator( task_id='dp-cluster-delete-task', trigger_rule=TriggerRule.ALL_DONE, ) # Формирование DAG из указанных выше этапов create_spark_cluster >> poke_spark_processing >> delete_spark_clusterГде:
YC_DP_AZ— зона доступности для кластера Yandex Data Processing;YC_DP_SSH_PUBLIC_KEY— открытая часть SSH-ключа для кластера Yandex Data Processing;YC_DP_SUBNET_ID— идентификатор подсети;YC_DP_SA_ID— идентификатор сервисного аккаунта для Yandex Data Processing;YC_DP_METASTORE_URI— IP-адрес кластера Apache Hive™ Metastore;YC_SOURCE_BUCKET— бакет с Python-скриптом для задания PySpark;YC_DP_LOGS_BUCKET— бакет для логов.
-
Загрузите DAG в кластер Managed Service for Apache Airflow™: создайте в бакете
<бакет_для_Managed_Airflow>папкуdagsи загрузите в нее файлData-Processing-DAG.py. -
Откройте веб-интерфейс Apache Airflow™.
-
Убедитесь, что в разделе DAGs появился новый DAG
DATA_INGESTс тегомdata-processing-and-airflow.Загрузка DAG-файла из бакета может занять несколько минут.
-
Чтобы запустить DAG, в строке с его именем нажмите кнопку
.
-
Создайте SSH-ключ. Сохраните открытую часть ключа — она понадобится для создания кластера Yandex Data Processing.
-
Создайте локально файл с именем
Data-Processing-DAG.py, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:Data-Processing-DAG.py
import uuid import datetime from airflow import DAG from airflow.utils.trigger_rule import TriggerRule from airflow.providers.yandex.operators.yandexcloud_dataproc import ( DataprocCreateClusterOperator, DataprocCreatePysparkJobOperator, DataprocDeleteClusterOperator, ) # Данные вашей инфраструктуры YC_DP_AZ = 'ru-central1-a' YC_DP_SSH_PUBLIC_KEY = '<открытая_часть_SSH-ключа>' YC_DP_SUBNET_ID = '<идентификатор_подсети>' YC_DP_SA_ID = '<идентификатор_сервисного_аккаунта_my-editor>' YC_DP_METASTORE_URI = '<IP-адрес>' YC_BUCKET = '<бакет_для_заданий_и_данных>' # Настройки DAG with DAG( 'DATA_INGEST', schedule='@hourly', tags=['data-processing-and-airflow'], start_date=datetime.datetime.now(), max_active_runs=1, catchup=False ) as ingest_dag: # 1 этап: создание кластера Yandex Data Proc create_spark_cluster = DataprocCreateClusterOperator( task_id='dp-cluster-create-task', cluster_name=f'tmp-dp-{uuid.uuid4()}', cluster_description='Временный кластер для выполнения PySpark-задания под оркестрацией Managed Service for Apache Airflow™', ssh_public_keys=YC_DP_SSH_PUBLIC_KEY, service_account_id=YC_DP_SA_ID, subnet_id=YC_DP_SUBNET_ID, s3_bucket=YC_BUCKET, zone=YC_DP_AZ, cluster_image_version='2.1', masternode_resource_preset='s2.small', masternode_disk_type='network-ssd', masternode_disk_size=200, computenode_resource_preset='m2.large', computenode_disk_type='network-ssd', computenode_disk_size=200, computenode_count=2, computenode_max_hosts_count=5, # Количество подкластеров для обработки данных будет автоматически масштабироваться в случае большой нагрузки. services=['YARN', 'SPARK'], # Создается легковесный кластер. datanode_count=0, # Без подкластеров для хранения данных. properties={ # С указанием на удаленный кластер Apache Hive™ Metastore. 'spark:spark.hive.metastore.uris': f'thrift://{YC_DP_METASTORE_URI}:9083', }, ) # 2 этап: запуск задания PySpark poke_spark_processing = DataprocCreatePysparkJobOperator( task_id='dp-cluster-pyspark-task', main_python_file_uri=f's3a://{YC_BUCKET}/scripts/create-table.py', ) # 3 этап: удаление кластера Yandex Data Processing delete_spark_cluster = DataprocDeleteClusterOperator( task_id='dp-cluster-delete-task', trigger_rule=TriggerRule.ALL_DONE, ) # Формирование DAG из указанных выше этапов create_spark_cluster >> poke_spark_processing >> delete_spark_clusterГде:
YC_DP_AZ— зона доступности для кластера Yandex Data Processing;YC_DP_SSH_PUBLIC_KEY— открытая часть SSH-ключа для кластера Yandex Data Processing;YC_DP_SUBNET_ID— идентификатор подсети;YC_DP_SA_ID— идентификатор сервисного аккаунтаmy-editor;YC_DP_METASTORE_URI— IP-адрес кластера Apache Hive™ Metastore;YC_BUCKET—<бакет_для_заданий_и_данных>.
-
Загрузите DAG в кластер Managed Service for Apache Airflow™: создайте в бакете
<бакет_для_заданий_и_данных>папкуdagsи загрузите в нее файлData-Processing-DAG.py. -
Откройте веб-интерфейс Apache Airflow™.
-
Убедитесь, что в разделе DAGs появился новый DAG
DATA_INGESTс тегомdata-processing-and-airflow.Загрузка DAG-файла из бакета может занять несколько минут.
-
Чтобы запустить DAG, в строке с его именем нажмите кнопку
.
Проверьте результат
- Чтобы отслеживать результаты выполнения задач, нажмите на название DAG. Результаты отображаются во вкладке Grid.
- Дождитесь, когда все три задачи в DAG перейдут в статус Success. Параллельно вы можете проверить, что в консоли управления
создается кластер Yandex Data Processing, выполняется задание PySpark и удаляется тот же кластер. - Убедитесь, что в бакете
<бакет_для_выходных_данных_PySpark_задания>появилась папкаcountries, а в ней — файлpart-00000-.... Теперь данные из созданной таблицы хранятся в бакете Object Storage, а метаинформация о ней — в кластере Apache Hive™ Metastore. - Проверьте, что в бакете
<бакет_для_сбора_логов_Spark>появились логи выполнения PySpark-задания.
- Чтобы отслеживать результаты выполнения задач, нажмите на название DAG. Результаты отображаются во вкладке Grid.
- Дождитесь, когда все три задачи в DAG перейдут в статус Success. Параллельно вы можете проверить, что в консоли управления
создается кластер Yandex Data Processing, выполняется задание PySpark и удаляется тот же кластер. - Убедитесь, что в бакете
<бакет_для_заданий_и_данных>появилась папкаcountries, а в ней — файлpart-00000-.... Теперь данные из созданной таблицы хранятся в бакете Object Storage, а метаинформация о ней — в кластере Apache Hive™ Metastore. - Проверьте, что в бакете
<бакет_для_заданий_и_данных>появились логи выполнения PySpark-задания. Они записываются в папкиdataproc,userиvar.
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:
- Сервисные аккаунты.
- Бакеты Object Storage.
- Кластер Apache Hive™ Metastore.
- Кластер Managed Service for Apache Airflow™.
- Таблицу маршрутизации.
- NAT-шлюз.
- Группы безопасности.
- Облачные подсети, созданные по умолчанию в сети
data-processing-network. - Облачную сеть.
- Сервисный аккаунт.
- Бакет Object Storage.
- Кластер Apache Hive™ Metastore.
- Кластер Managed Service for Apache Airflow™.
- Таблицу маршрутизации.
- NAT-шлюз.
- Группу безопасности, созданную по умолчанию в сети
data-processing-network. - Облачные подсети, созданные по умолчанию в сети
data-processing-network. - Облачную сеть.