Автоматизация работы с Yandex Data Processing
В сервисе Yandex Managed Service for Apache Airflow™ можно создать DAG — направленный ациклический граф задач, который позволит автоматизировать работу с сервисом Yandex Data Processing. Ниже рассматривается DAG, который включает в себя несколько задач:
- Создать кластер Yandex Data Processing.
- Создать и запустить задание PySpark.
- Удалить кластер Yandex Data Processing.
При таком DAG кластер существует непродолжительное время. Так как стоимость ресурсов Yandex Data Processing зависит от времени их использования, в кластере можно задействовать ресурсы повышенной мощности и быстро обработать большее количество данных за те же деньги.
В этом DAG кластер Yandex Data Processing создается без сервиса Hive. Для хранения табличных метаданных в примере ниже используется кластер Hive Metastore. Сохраненные метаданные затем может использовать другой кластер Yandex Data Processing.
Чтобы автоматизировать работу с Yandex Data Processing с помощью Managed Service for Apache Airflow™:
- Подготовьте инфраструктуру.
- Подготовьте PySpark-задание.
- Создайте подключения Apache Airflow™.
- Подготовьте и запустите DAG-файл.
- Проверьте результат.
Если созданные ресурсы вам больше не нужны, удалите их.
Примечание
Сервис Metastore находится на стадии Preview. Чтобы получить доступ, обратитесь в техническую поддержку
Подготовьте инфраструктуру
-
Создайте сервисный аккаунт
airflow-sa
с ролями:storage.editor
;dataproc.agent
;dataproc.editor
.
-
Создайте ключи для сервисного аккаунта:
- Статический ключ доступа. Сохраните его идентификатор и секретный ключ.
- Авторизованный ключ. Сохраните открытую часть ключа и скачайте файл с открытой и закрытой частями.
-
airflow-bucket
— для Managed Service for Apache Airflow™;pyspark-bucket
— для PySpark-задания;output-bucket
– для выходных данных;log-bucket
— для сбора логов.
Бакетов нужно несколько, так как на них назначаются различные права доступа.
-
Предоставьте сервисному аккаунту
airflow-sa
разрешения на бакеты:airflow-bucket
— разрешениеREAD
;pyspark-bucket
— разрешениеREAD
;output-bucket
– разрешениеREAD и WRITE
;log-bucket
— разрешениеREAD и WRITE
.
-
Создайте облачную сеть с именем
dataproc-network
.Вместе с ней автоматически создадутся три подсети в разных зонах доступности и группа безопасности.
-
Настройте NAT-шлюз для подсети
dataproc-network-ru-central1-a
. -
Добавьте правила в группу безопасности в сети
dataproc-network
:Правила группы безопасности
Для какого сервиса нужно правило
Зачем нужно правило
Настройки правила
Yandex Data Processing
Для входящего служебного трафика.
- Диапазон портов —
0-65535
. - Протокол —
Любой
(Any
). - Источник —
Группа безопасности
. - Группа безопасности —
Текущая
(Self
).
Metastore
Для входящего трафика от клиентов.
- Диапазон портов —
30000-32767
. - Протокол —
Любой
(Any
). - Источник —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
Metastore
Для входящего трафика от балансировщика.
- Диапазон портов —
10256
. - Протокол —
Любой
(Any
). - Источник —
Проверки состояния балансировщика
.
Yandex Data Processing
Для исходящего служебного трафика.
- Диапазон портов —
0-65535
. - Протокол —
Любой
(Any
). - Источник —
Группа безопасности
. - Группа безопасности —
Текущая
(Self
).
Yandex Data Processing
Для исходящего HTTPS-трафика.
- Диапазон портов —
443
. - Протокол —
TCP
. - Назначение —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
Yandex Data Processing
Для исходящего трафика, чтобы разрешить подключение кластера Yandex Data Processing к Metastore.
- Диапазон портов —
9083
. - Протокол —
Любой
(Any
). - Источник —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
Создайте кластер Metastore с параметрами:
- Сеть —
dataproc-network
. - Подсеть —
dataproc-network-ru-central1-a
. - Группа безопасности — группа по умолчанию в сети
dataproc-network
. - Идентификатор ключа и секретный ключ — принадлежат статическому ключу доступа.
- Сеть —
-
Создайте кластер Managed Service for Apache Airflow™ с параметрами:
- Зона доступности —
ru-central1-a
. - Сеть —
dataproc-network
. - Подсеть —
dataproc-network-ru-central1-a
. - Группа безопасности — группа по умолчанию в сети
dataproc-network
. - Имя бакета —
airflow-bucket
.
- Зона доступности —
Подготовьте PySpark-задание
Для PySpark-задания будет использован Python-скрипт, который создает таблицу и хранится в бакете Object Storage. Подготовьте файл скрипта:
-
Создайте локально файл с именем
create-table.py
и скопируйте в него скрипт:create-table.py
from pyspark.sql.types import * from pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder \ .appName("create-table") \ .enableHiveSupport() \ .getOrCreate() # Создание схемы данных schema = StructType([StructField('Name', StringType(), True), StructField('Capital', StringType(), True), StructField('Area', IntegerType(), True), StructField('Population', IntegerType(), True)]) # Создание датафрейма df = spark.createDataFrame([('Австралия', 'Канберра', 7686850, 19731984), ('Австрия', 'Вена', 83855, 7700000)], schema) # Запись датафрейма в бакет в виде таблицы countries df.write.mode("overwrite").option("path","s3a://output-bucket/countries").saveAsTable("countries")
-
Создайте в бакете
pyspark-bucket
папкуscripts
и загрузите в нее файлcreate-table.py
.
Создайте подключения Apache Airflow™
Для DAG задач будет использовано два подключения: для Object Storage и сервисного аккаунта. Они будут указаны в DAG-файле. Чтобы создать подключения:
-
Перейдите в раздел Admin → Connections.
-
Нажмите кнопку .
-
Создайте подключение для Object Storage. Укажите его параметры:
-
Connection Id —
yc-s3
. -
Connection Type —
Amazon Elastic MapReduce
. -
Run Job Flow Configuration — JSON-файл в формате:
{ "aws_access_key_id": "<идентификатор_статического_ключа>", "aws_secret_access_key": "<секретный_ключ>", "host": "https://storage.yandexcloud.net/" }
В файле укажите данные статического ключа, который вы создавали ранее.
-
-
Нажмите кнопку Save.
-
Снова нажмите кнопку , чтобы создать подключение для сервисного аккаунта.
-
Укажите параметры подключения:
- Connection Id —
yc-airflow-sa
. - Connection Type —
Yandex Cloud
. - Service account auth JSON — содержимое JSON-файла с авторизованным ключом.
- Public SSH key — публичная часть авторизованного ключа, который вы создавали ранее. Оформите ключ в виде одной строки, в нем не должно быть символов переноса.
Остальные поля оставьте пустыми.
- Connection Id —
-
Нажмите кнопку Save.
Подготовьте и запустите DAG-файл
DAG будет состоять из нескольких вершин, которые формируют цепочку последовательных действий:
- Managed Service for Apache Airflow™ создает временный, легковесный кластер Yandex Data Processing с настройками, заданными в DAG. Этот кластер автоматически подключается к созданному ранее кластеру Metastore.
- Когда кластер Yandex Data Processing готов, запускается задание PySpark.
- После выполнения задания временный кластер Yandex Data Processing удаляется.
Чтобы подготовить DAG:
-
Создайте SSH-ключ. Сохраните открытую часть ключа — она понадобится для создания кластера Yandex Data Processing.
-
Создайте локально файл с именем
Data-Proc-DAG.py
, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:Data-Proc-DAG.py
import uuid import datetime from airflow import DAG, settings from airflow.models import Connection, Variable from airflow.utils.trigger_rule import TriggerRule from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator from airflow.providers.yandex.operators.yandexcloud_dataproc import ( DataprocCreateClusterOperator, DataprocCreatePysparkJobOperator, DataprocDeleteClusterOperator, ) # Данные вашей инфраструктуры YC_DP_FOLDER_ID = '<идентификатор_каталога>' YC_DP_SSH_PUBLIC_KEY = '<открытая_часть_SSH-ключа>' YC_DP_SUBNET_ID = '<идентификатор_подсети>' YC_DP_GROUP_ID = '<идентификатор_группы_безопасности>' YC_DP_SA_ID = '<идентификатор_сервисного_аккаунта>' YC_DP_METASTORE_URI = '<IP-адрес>' YC_DP_AZ = 'ru-central1-a' YC_SOURCE_BUCKET = 'pyspark-bucket' YC_DP_LOGS_BUCKET = 'log-bucket' # Создание подключения для Object Storage session = settings.Session() ycS3_connection = Connection( conn_id='yc-s3' ) if not session.query(Connection).filter(Connection.conn_id == ycS3_connection.conn_id).first(): session.add(ycS3_connection) session.commit() # Создание подключения для сервисного аккаунта ycSA_connection = Connection( conn_id='yc-airflow-sa' ) if not session.query(Connection).filter(Connection.conn_id == ycSA_connection.conn_id).first(): session.add(ycSA_connection) session.commit() # Настройки DAG with DAG( 'DATA_INGEST', schedule_interval='@hourly', tags=['data-proc-and-airflow'], start_date=datetime.datetime.now(), max_active_runs=1, catchup=False ) as ingest_dag: # 1 этап: создание кластера Yandex Data Processing create_spark_cluster = DataprocCreateClusterOperator( task_id='dp-cluster-create-task', folder_id=YC_DP_FOLDER_ID, cluster_name=f'tmp-dp-{uuid.uuid4()}', cluster_description='Временный кластер для выполнения PySpark-задания под оркестрацией Managed Service for Apache Airflow™', ssh_public_keys=YC_DP_SSH_PUBLIC_KEY, subnet_id=YC_DP_SUBNET_ID, s3_bucket=YC_DP_LOGS_BUCKET, service_account_id=YC_DP_SA_ID, zone=YC_DP_AZ, cluster_image_version='2.1.7', enable_ui_proxy=False, masternode_resource_preset='s2.small', masternode_disk_type='network-ssd', masternode_disk_size=200, computenode_resource_preset='m2.large', computenode_disk_type='network-ssd', computenode_disk_size=200, computenode_count=2, computenode_max_hosts_count=5, # Количество подкластеров для обработки данных будет автоматически масштабироваться в случае большой нагрузки. services=['YARN', 'SPARK'], # Создается легковесный кластер. datanode_count=0, # Без подкластеров для хранения данных. properties={ # С указанием на удаленный кластер Metastore. 'spark:spark.hive.metastore.uris': f'thrift://{YC_DP_METASTORE_URI}:9083', }, security_group_ids=[YC_DP_GROUP_ID], connection_id=ycSA_connection.conn_id, dag=ingest_dag ) # 2 этап: запуск задания PySpark poke_spark_processing = DataprocCreatePysparkJobOperator( task_id='dp-cluster-pyspark-task', main_python_file_uri=f's3a://{YC_SOURCE_BUCKET}/scripts/create-table.py', connection_id=ycSA_connection.conn_id, dag=ingest_dag ) # 3 этап: удаление кластера Yandex Data Processing delete_spark_cluster = DataprocDeleteClusterOperator( task_id='dp-cluster-delete-task', trigger_rule=TriggerRule.ALL_DONE, dag=ingest_dag ) # Формирование DAG из указанных выше этапов create_spark_cluster >> poke_spark_processing >> delete_spark_cluster
Где:
YC_DP_FOLDER_ID
— идентификатор каталога в облаке.YC_DP_SSH_PUBLIC_KEY
— открытая часть SSH-ключа для кластера Yandex Data Processing.YC_DP_SUBNET_ID
— идентификатор подсети.YC_DP_GROUP_ID
— идентификатор группы безопасности.YC_DP_SA_ID
— идентификатор сервисного аккаунта.YC_DP_METASTORE_URI
— IP-адрес кластера Metastore.YC_DP_AZ
— зона доступности для кластера Yandex Data Processing, напримерru-central1-a
.YC_SOURCE_BUCKET
— бакет с Python-скриптом для задания PySpark, напримерpyspark-bucket
.YC_DP_LOGS_BUCKET
— бакет для логов, напримерpyspark-bucket
.
-
Загрузите DAG в кластер Managed Service for Apache Airflow™: создайте в бакете
airflow-bucket
папкуdags
и загрузите в нее файлData-Proc-DAG.py
. -
Убедитесь, что в разделе DAGs появился новый DAG
DATA_INGEST
с тегомdata-proc-and-airflow
.Загрузка DAG-файла из бакета может занять несколько минут.
-
Чтобы запустить DAG, в строке с его именем нажмите кнопку .
Проверьте результат
- Чтобы отслеживать результаты выполнения задач, нажмите на название DAG. Результаты отображаются во вкладке Grid.
- Дождитесь, когда все три задачи в DAG перейдут в статус Success. Параллельно вы можете проверить, что в консоли управления
создается кластер Yandex Data Processing, выполняется задание PySpark и удаляется тот же кластер. - Убедитесь, что в бакете
output-bucket
появилась папкаcountries
, а в ней — файлpart-00000-...
. Теперь данные из созданной таблицы хранятся в бакете Object Storage, а метаинформация о ней — в кластере Metastore. - Проверьте, что в бакете
log-bucket
появились логи выполнения PySpark-задания.
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:
- Сервисный аккаунт.
- Бакеты Object Storage.
- Кластер Metastore.
- Кластер Managed Service for Apache Airflow™.
- Таблицу маршрутизации.
- NAT-шлюз.
- Группу безопасности.
- Облачные подсети, созданные по умолчанию в сети
dataproc-network
. - Облачную сеть.