Автоматизация работы с Yandex Data Processing с помощью Yandex Managed Service for Apache Airflow™
В сервисе Yandex Managed Service for Apache Airflow™ можно создать DAG — направленный ациклический граф задач, который позволит автоматизировать работу с сервисом Yandex Data Processing. Ниже рассматривается DAG, который включает в себя несколько задач:
- Создать кластер Yandex Data Processing.
- Создать и запустить задание PySpark.
- Удалить кластер Yandex Data Processing.
При таком DAG кластер существует непродолжительное время. Так как стоимость ресурсов Yandex Data Processing зависит от времени их использования, в кластере можно задействовать ресурсы повышенной мощности и быстро обработать большее количество данных за те же деньги.
В этом DAG кластер Yandex Data Processing создается без сервиса Hive. Для хранения табличных метаданных в примере ниже используется кластер Hive Metastore. Сохраненные метаданные затем может использовать другой кластер Yandex Data Processing.
Чтобы автоматизировать работу с Yandex Data Processing с помощью Managed Service for Apache Airflow™:
- Подготовьте инфраструктуру.
- Подготовьте PySpark-задание.
- Подготовьте и запустите DAG-файл.
- Проверьте результат.
Если созданные ресурсы вам больше не нужны, удалите их.
Подготовьте инфраструктуру
В примере ниже рассматривается два сценария. Выберите наиболее подходящий:
-
Высокий уровень безопасности. Такой сценарий рекомендуемый, так как в нем соблюдается принцип минимальных привилегий. Сценарий включает в себя следующие особенности:
- Права доступа разделяются между сервисными аккаунтами. Для каждого кластера вы создаете отдельный сервисный аккаунт и назначаете ему роли, необходимые только для работы кластера этого аккаунта.
- Используется несколько бакетов для разных задач, различные данные хранятся в разных бакетах. Например, результаты выполнения PySpark-задания записываются в один бакет, а логи — в другой.
- Настроены группы безопасности. Вы ограничиваете трафик, и в результате доступ получают только разрешенные ресурсы.
-
Упрощенная настройка. Предусматривает более низкий уровень безопасности:
- Используется один сервисный аккаунт, который обладает большими привилегиями, чем необходимо.
- Все данные хранятся только в одном бакете, но в разных папках.
- Группы безопасности не настраиваются.
Подготовьте инфраструктуру:
-
Создайте сервисные аккаунты со следующими ролями:
Сервисный аккаунт
Его роли
airflow-agent
для кластера Apache Airflow™.- dataproc.editor — чтобы управлять кластером Yandex Data Processing из DAG;
- vpc.user — чтобы в кластере Apache Airflow™ использовать подсеть Yandex Virtual Private Cloud;
- managed-airflow.integrationProvider — чтобы кластер Apache Airflow™ мог взаимодействовать с другими ресурсами;
- iam.serviceAccounts.user — чтобы указать сервисный аккаунт
data-processing-agent
при создании кластера Yandex Data Processing.
metastore-agent
для кластера Metastore.- managed-metastore.integrationProvider — чтобы кластер Metastore мог взаимодействовать с другими ресурсами.
data-processing-agent
для кластера Yandex Data Processing.- dataproc.agent — чтобы сервисный аккаунт мог получать информацию о состоянии хостов кластера, заданиях и лог-группах.
- dataproc.provisioner — чтобы сервисный аккаунт мог взаимодействовать с автоматически масштабируемой группой ВМ. Тогда будет доступно автомасштабирование подкластеров.
-
<бакет_для_Managed_Airflow>
;<бакет_для_исходного_кода_PySpark_задания>
;<бакет_для_выходных_данных_PySpark_задания>
;<бакет_для_сбора_логов_Spark>
.
Бакетов нужно несколько, так как на них назначаются различные права доступа.
-
Предоставьте разрешения на следующие бакеты:
<бакет_для_Managed_Airflow>
— разрешениеREAD
для сервисного аккаунтаairflow-agent
;<бакет_для_исходного_кода_PySpark_задания>
— разрешениеREAD
для сервисного аккаунтаdata-processing-agent
;<бакет_для_выходных_данных_PySpark_задания>
— разрешениеREAD и WRITE
для сервисных аккаунтовdata-processing-agent
иmetastore-agent
;<бакет_для_сбора_логов_Spark>
— разрешениеREAD и WRITE
для сервисного аккаунтаdata-processing-agent
.
-
Создайте облачную сеть с именем
data-processing-network
.Вместе с ней автоматически создадутся три подсети в разных зонах доступности.
-
Настройте NAT-шлюз для подсети
data-processing-network-ru-central1-a
. -
Для кластера Metastore создайте группу безопасности
metastore-sg
в сетиdata-processing-network
. Добавьте в группу следующие правила:-
Для входящего трафика от клиентов:
- диапазон портов —
30000-32767
; - протокол —
Любой
(Any
); - источник —
CIDR
; - CIDR блоки —
0.0.0.0/0
.
- диапазон портов —
-
Для входящего трафика от балансировщика:
- диапазон портов —
10256
; - протокол —
Любой
(Any
); - источник —
Проверки состояния балансировщика
.
- диапазон портов —
-
-
Для кластеров Managed Service for Apache Airflow™ и Yandex Data Processing создайте группу безопасности
airflow-sg
в сетиdata-processing-network
. Добавьте в группу следующие правила:-
Для входящего служебного трафика:
- диапазон портов —
0-65535
; - протокол —
Любой
(Any
); - источник —
Группа безопасности
; - группа безопасности —
Текущая
(Self
).
- диапазон портов —
-
Для исходящего служебного трафика:
- диапазон портов —
0-65535
; - протокол —
Любой
(Any
); - источник —
Группа безопасности
; - группа безопасности —
Текущая
(Self
).
- диапазон портов —
-
Для исходящего HTTPS-трафика:
- диапазон портов —
443
; - протокол —
TCP
; - назначение —
CIDR
; - CIDR блоки —
0.0.0.0/0
.
- диапазон портов —
-
Для исходящего трафика, чтобы разрешить подключение кластера Yandex Data Processing к Metastore:
- диапазон портов —
9083
; - протокол —
Любой
(Any
); - источник —
Группа безопасности
; - группа безопасности —
metastore-sg
(Из списка
).
- диапазон портов —
-
-
Создайте кластер Metastore с параметрами:
- Сервисный аккаунт —
metastore-agent
. - Сеть —
data-processing-network
. - Подсеть —
data-processing-network-ru-central1-a
. - Группа безопасности —
metastore-sg
.
- Сервисный аккаунт —
-
Создайте кластер Managed Service for Apache Airflow™ с параметрами:
- Сервисный аккаунт —
airflow-agent
. - Зона доступности —
ru-central1-a
. - Сеть —
data-processing-network
. - Подсеть —
data-processing-network-ru-central1-a
. - Группа безопасности —
airflow-sg
. - Имя бакета —
<бакет_для_Managed_Airflow>
.
- Сервисный аккаунт —
Подготовьте инфраструктуру:
-
Создайте сервисный аккаунт
my-editor
со следующими ролями:- dataproc.editor — для управления кластером Yandex Data Processing из DAG;
- editor — для остальных необходимых операций.
-
Создайте бакет
<бакет_для_заданий_и_данных>
.На него не нужно предоставлять разрешение сервисному аккаунту, так как роли
editor
достаточно. -
Создайте облачную сеть с именем
data-processing-network
.Вместе с ней автоматически создадутся три подсети в разных зонах доступности и группа безопасности.
-
Настройте NAT-шлюз для подсети
data-processing-network-ru-central1-a
. -
Создайте кластер Metastore с параметрами:
- Сервисный аккаунт —
my-editor
. - Сеть —
data-processing-network
. - Подсеть —
data-processing-network-ru-central1-a
. - Группа безопасности — группа по умолчанию в сети
data-processing-network
.
- Сервисный аккаунт —
-
Создайте кластер Managed Service for Apache Airflow™ с параметрами:
- Сервисный аккаунт —
my-editor
. - Зона доступности —
ru-central1-a
. - Сеть —
data-processing-network
. - Подсеть —
data-processing-network-ru-central1-a
. - Группа безопасности — группа по умолчанию в сети
data-processing-network
. - Имя бакета —
<бакет_для_заданий_и_данных>
.
- Сервисный аккаунт —
Подготовьте PySpark-задание
Для PySpark-задания будет использован Python-скрипт, который создает таблицу и хранится в бакете Object Storage. Подготовьте файл скрипта:
-
Создайте локально файл с именем
create-table.py
и скопируйте в него скрипт:create-table.py
from pyspark.sql.types import * from pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder \ .appName("create-table") \ .enableHiveSupport() \ .getOrCreate() # Создание схемы данных schema = StructType([StructField('Name', StringType(), True), StructField('Capital', StringType(), True), StructField('Area', IntegerType(), True), StructField('Population', IntegerType(), True)]) # Создание датафрейма df = spark.createDataFrame([('Австралия', 'Канберра', 7686850, 19731984), ('Австрия', 'Вена', 83855, 7700000)], schema) # Запись датафрейма в бакет в виде таблицы countries df.write.mode("overwrite").option("path","s3a://<бакет_для_выходных_данных_PySpark_задания>/countries").saveAsTable("countries")
-
Создайте в бакете
<бакет_для_исходного_кода_PySpark_задания>
папкуscripts
и загрузите в нее файлcreate-table.py
.
-
Создайте локально файл с именем
create-table.py
и скопируйте в него скрипт:create-table.py
from pyspark.sql.types import * from pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder \ .appName("create-table") \ .enableHiveSupport() \ .getOrCreate() # Создание схемы данных schema = StructType([StructField('Name', StringType(), True), StructField('Capital', StringType(), True), StructField('Area', IntegerType(), True), StructField('Population', IntegerType(), True)]) # Создание датафрейма df = spark.createDataFrame([('Австралия', 'Канберра', 7686850, 19731984), ('Австрия', 'Вена', 83855, 7700000)], schema) # Запись датафрейма в бакет в виде таблицы countries df.write.mode("overwrite").option("path","s3a://<бакет_для_заданий_и_данных>/countries").saveAsTable("countries")
-
Создайте в бакете
<бакет_для_заданий_и_данных>
папкуscripts
и загрузите в нее файлcreate-table.py
.
Подготовьте и запустите DAG-файл
DAG будет состоять из нескольких вершин, которые формируют цепочку последовательных действий:
- Managed Service for Apache Airflow™ создает временный, легковесный кластер Yandex Data Processing с настройками, заданными в DAG. Этот кластер автоматически подключается к созданному ранее кластеру Metastore.
- Когда кластер Yandex Data Processing готов, запускается задание PySpark.
- После выполнения задания временный кластер Yandex Data Processing удаляется.
Чтобы подготовить DAG:
-
Создайте SSH-ключ. Сохраните открытую часть ключа — она понадобится для создания кластера Yandex Data Processing.
-
Создайте локально файл с именем
Data-Processing-DAG.py
, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:Data-Processing-DAG.py
import uuid import datetime from airflow import DAG from airflow.utils.trigger_rule import TriggerRule from airflow.providers.yandex.operators.yandexcloud_dataproc import ( DataprocCreateClusterOperator, DataprocCreatePysparkJobOperator, DataprocDeleteClusterOperator, ) # Данные вашей инфраструктуры YC_DP_AZ = 'ru-central1-a' YC_DP_SSH_PUBLIC_KEY = '<открытая_часть_SSH-ключа>' YC_DP_SUBNET_ID = '<идентификатор_подсети>' YC_DP_SA_ID = '<идентификатор_сервисного_аккаунта_data-processing-agent>' YC_DP_METASTORE_URI = '<IP-адрес>' YC_SOURCE_BUCKET = '<бакет_для_исходного_кода_PySpark_задания>' YC_DP_LOGS_BUCKET = '<бакет_для_сбора_логов_Spark>' # Настройки DAG with DAG( 'DATA_INGEST', schedule_interval='@hourly', tags=['data-processing-and-airflow'], start_date=datetime.datetime.now(), max_active_runs=1, catchup=False ) as ingest_dag: # 1 этап: создание кластера Yandex Data Proc create_spark_cluster = DataprocCreateClusterOperator( task_id='dp-cluster-create-task', cluster_name=f'tmp-dp-{uuid.uuid4()}', cluster_description='Временный кластер для выполнения PySpark-задания под оркестрацией Managed Service for Apache Airflow™', ssh_public_keys=YC_DP_SSH_PUBLIC_KEY, service_account_id=YC_DP_SA_ID, subnet_id=YC_DP_SUBNET_ID, s3_bucket=YC_DP_LOGS_BUCKET, zone=YC_DP_AZ, cluster_image_version='2.1', masternode_resource_preset='s2.small', masternode_disk_type='network-ssd', masternode_disk_size=200, computenode_resource_preset='m2.large', computenode_disk_type='network-ssd', computenode_disk_size=200, computenode_count=2, computenode_max_hosts_count=5, # Количество подкластеров для обработки данных будет автоматически масштабироваться в случае большой нагрузки. services=['YARN', 'SPARK'], # Создается легковесный кластер. datanode_count=0, # Без подкластеров для хранения данных. properties={ # С указанием на удаленный кластер Metastore. 'spark:spark.hive.metastore.uris': f'thrift://{YC_DP_METASTORE_URI}:9083', }, ) # 2 этап: запуск задания PySpark poke_spark_processing = DataprocCreatePysparkJobOperator( task_id='dp-cluster-pyspark-task', main_python_file_uri=f's3a://{YC_SOURCE_BUCKET}/scripts/create-table.py', ) # 3 этап: удаление кластера Yandex Data Processing delete_spark_cluster = DataprocDeleteClusterOperator( task_id='dp-cluster-delete-task', trigger_rule=TriggerRule.ALL_DONE, ) # Формирование DAG из указанных выше этапов create_spark_cluster >> poke_spark_processing >> delete_spark_cluster
Где:
YC_DP_AZ
— зона доступности для кластера Yandex Data Processing;YC_DP_SSH_PUBLIC_KEY
— открытая часть SSH-ключа для кластера Yandex Data Processing;YC_DP_SUBNET_ID
— идентификатор подсети;YC_DP_SA_ID
— идентификатор сервисного аккаунта для Yandex Data Processing;YC_DP_METASTORE_URI
— IP-адрес кластера Metastore;YC_SOURCE_BUCKET
— бакет с Python-скриптом для задания PySpark;YC_DP_LOGS_BUCKET
— бакет для логов.
-
Загрузите DAG в кластер Managed Service for Apache Airflow™: создайте в бакете
<бакет_для_Managed_Airflow>
папкуdags
и загрузите в нее файлData-Processing-DAG.py
. -
Откройте веб-интерфейс Apache Airflow™.
-
Убедитесь, что в разделе DAGs появился новый DAG
DATA_INGEST
с тегомdata-processing-and-airflow
.Загрузка DAG-файла из бакета может занять несколько минут.
-
Чтобы запустить DAG, в строке с его именем нажмите кнопку .
-
Создайте SSH-ключ. Сохраните открытую часть ключа — она понадобится для создания кластера Yandex Data Processing.
-
Создайте локально файл с именем
Data-Processing-DAG.py
, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:Data-Processing-DAG.py
import uuid import datetime from airflow import DAG from airflow.utils.trigger_rule import TriggerRule from airflow.providers.yandex.operators.yandexcloud_dataproc import ( DataprocCreateClusterOperator, DataprocCreatePysparkJobOperator, DataprocDeleteClusterOperator, ) # Данные вашей инфраструктуры YC_DP_AZ = 'ru-central1-a' YC_DP_SSH_PUBLIC_KEY = '<открытая_часть_SSH-ключа>' YC_DP_SUBNET_ID = '<идентификатор_подсети>' YC_DP_SA_ID = '<идентификатор_сервисного_аккаунта_my-editor>' YC_DP_METASTORE_URI = '<IP-адрес>' YC_BUCKET = '<бакет_для_заданий_и_данных>' # Настройки DAG with DAG( 'DATA_INGEST', schedule_interval='@hourly', tags=['data-processing-and-airflow'], start_date=datetime.datetime.now(), max_active_runs=1, catchup=False ) as ingest_dag: # 1 этап: создание кластера Yandex Data Proc create_spark_cluster = DataprocCreateClusterOperator( task_id='dp-cluster-create-task', cluster_name=f'tmp-dp-{uuid.uuid4()}', cluster_description='Временный кластер для выполнения PySpark-задания под оркестрацией Managed Service for Apache Airflow™', ssh_public_keys=YC_DP_SSH_PUBLIC_KEY, service_account_id=YC_DP_SA_ID, subnet_id=YC_DP_SUBNET_ID, s3_bucket=YC_BUCKET, zone=YC_DP_AZ, cluster_image_version='2.1', masternode_resource_preset='s2.small', masternode_disk_type='network-ssd', masternode_disk_size=200, computenode_resource_preset='m2.large', computenode_disk_type='network-ssd', computenode_disk_size=200, computenode_count=2, computenode_max_hosts_count=5, # Количество подкластеров для обработки данных будет автоматически масштабироваться в случае большой нагрузки. services=['YARN', 'SPARK'], # Создается легковесный кластер. datanode_count=0, # Без подкластеров для хранения данных. properties={ # С указанием на удаленный кластер Metastore. 'spark:spark.hive.metastore.uris': f'thrift://{YC_DP_METASTORE_URI}:9083', }, ) # 2 этап: запуск задания PySpark poke_spark_processing = DataprocCreatePysparkJobOperator( task_id='dp-cluster-pyspark-task', main_python_file_uri=f's3a://{YC_BUCKET}/scripts/create-table.py', ) # 3 этап: удаление кластера Yandex Data Processing delete_spark_cluster = DataprocDeleteClusterOperator( task_id='dp-cluster-delete-task', trigger_rule=TriggerRule.ALL_DONE, ) # Формирование DAG из указанных выше этапов create_spark_cluster >> poke_spark_processing >> delete_spark_cluster
Где:
YC_DP_AZ
— зона доступности для кластера Yandex Data Processing;YC_DP_SSH_PUBLIC_KEY
— открытая часть SSH-ключа для кластера Yandex Data Processing;YC_DP_SUBNET_ID
— идентификатор подсети;YC_DP_SA_ID
— идентификатор сервисного аккаунтаmy-editor
;YC_DP_METASTORE_URI
— IP-адрес кластера Metastore;YC_BUCKET
—<бакет_для_заданий_и_данных>
.
-
Загрузите DAG в кластер Managed Service for Apache Airflow™: создайте в бакете
<бакет_для_заданий_и_данных>
папкуdags
и загрузите в нее файлData-Processing-DAG.py
. -
Откройте веб-интерфейс Apache Airflow™.
-
Убедитесь, что в разделе DAGs появился новый DAG
DATA_INGEST
с тегомdata-processing-and-airflow
.Загрузка DAG-файла из бакета может занять несколько минут.
-
Чтобы запустить DAG, в строке с его именем нажмите кнопку .
Проверьте результат
- Чтобы отслеживать результаты выполнения задач, нажмите на название DAG. Результаты отображаются во вкладке Grid.
- Дождитесь, когда все три задачи в DAG перейдут в статус Success. Параллельно вы можете проверить, что в консоли управления
создается кластер Yandex Data Processing, выполняется задание PySpark и удаляется тот же кластер. - Убедитесь, что в бакете
<бакет_для_выходных_данных_PySpark_задания>
появилась папкаcountries
, а в ней — файлpart-00000-...
. Теперь данные из созданной таблицы хранятся в бакете Object Storage, а метаинформация о ней — в кластере Metastore. - Проверьте, что в бакете
<бакет_для_сбора_логов_Spark>
появились логи выполнения PySpark-задания.
- Чтобы отслеживать результаты выполнения задач, нажмите на название DAG. Результаты отображаются во вкладке Grid.
- Дождитесь, когда все три задачи в DAG перейдут в статус Success. Параллельно вы можете проверить, что в консоли управления
создается кластер Yandex Data Processing, выполняется задание PySpark и удаляется тот же кластер. - Убедитесь, что в бакете
<бакет_для_заданий_и_данных>
появилась папкаcountries
, а в ней — файлpart-00000-...
. Теперь данные из созданной таблицы хранятся в бакете Object Storage, а метаинформация о ней — в кластере Metastore. - Проверьте, что в бакете
<бакет_для_заданий_и_данных>
появились логи выполнения PySpark-задания. Они записываются в папкиdataproc
,user
иvar
.
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:
- Сервисные аккаунты.
- Бакеты Object Storage.
- Кластер Metastore.
- Кластер Managed Service for Apache Airflow™.
- Таблицу маршрутизации.
- NAT-шлюз.
- Группы безопасности.
- Облачные подсети, созданные по умолчанию в сети
data-processing-network
. - Облачную сеть.
- Сервисный аккаунт.
- Бакет Object Storage.
- Кластер Metastore.
- Кластер Managed Service for Apache Airflow™.
- Таблицу маршрутизации.
- NAT-шлюз.
- Группу безопасности, созданную по умолчанию в сети
data-processing-network
. - Облачные подсети, созданные по умолчанию в сети
data-processing-network
. - Облачную сеть.