Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Истории успеха
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • AI Studio
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Истории успеха
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Managed Service for Apache Airflow™
  • Начало работы
    • Все руководства
      • Yandex Data Processing: автоматизация работы
      • Yandex Query: автоматизация задач
      • Managed Service for Apache Spark™: автоматизация работы
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Yandex Monitoring
  • История изменений
  • Вопросы и ответы

В этой статье:

  • Необходимые платные ресурсы
  • Подготовьте инфраструктуру
  • Подготовьте PySpark-задание
  • Подготовьте и запустите DAG-файл
  • Проверьте результат
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Работа с кластерами
  3. Managed Service for Apache Spark™: автоматизация работы

Автоматизация работы с Yandex Managed Service for Apache Spark™

Статья создана
Yandex Cloud
Обновлена 26 мая 2025 г.
  • Необходимые платные ресурсы
  • Подготовьте инфраструктуру
  • Подготовьте PySpark-задание
  • Подготовьте и запустите DAG-файл
  • Проверьте результат
  • Удалите созданные ресурсы

В сервисе Yandex Managed Service for Apache Airflow™ можно создать DAG — направленный ациклический граф задач, который позволит автоматизировать работу с сервисом Yandex Managed Service for Apache Spark™. Ниже рассматривается DAG, который включает в себя несколько задач:

  1. Создать кластер Apache Spark™.
  2. Запустить задание PySpark.
  3. Удалить кластер Apache Spark™.

При таком DAG кластер существует непродолжительное время. В кластере можно задействовать ресурсы повышенной мощности и быстро обработать большее количество данных.

В этом DAG создается кластер Apache Spark™. Для хранения табличных метаданных в примере ниже используется кластер Hive Metastore. Сохраненные метаданные затем может использовать другой кластер Apache Spark™.

Для использования Yandex Managed Service for Apache Spark™ в сервисе Yandex Managed Service for Apache Airflow™:

  1. Подготовьте инфраструктуру.
  2. Подготовьте PySpark-задание.
  3. Подготовьте и запустите DAG-файл.
  4. Проверьте результат.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Apache Airflow™: вычислительные ресурсы компонентов кластера (см. тарифы Yandex Managed Service for Apache Airflow™).
  • Плата за бакеты Object Storage: хранение данных и выполнение операций с ними (см. тарифы Object Storage).
  • Плата за получение и хранение логов (см. тарифы Cloud Logging).

Подготовьте инфраструктуруПодготовьте инфраструктуру

В примере ниже рассматривается два сценария. Выберите наиболее подходящий:

  • Высокий уровень безопасности. Такой сценарий рекомендуемый, так как в нем соблюдается принцип минимальных привилегий. Сценарий включает в себя следующие особенности:

    • Права доступа разделяются между сервисными аккаунтами. Для каждого кластера вы создаете отдельный сервисный аккаунт и назначаете ему роли, необходимые только для работы кластера этого аккаунта.
    • Используется несколько бакетов для разных задач, различные данные хранятся в разных бакетах. Например, DAG загружается в один бакет, а результаты выполнения PySpark-задания записываются в другой бакет.
    • Настроены группы безопасности. Вы ограничиваете трафик, и в результате доступ получают только разрешенные ресурсы.
  • Упрощенная настройка. Предусматривает более низкий уровень безопасности:

    • Используется один сервисный аккаунт, который обладает большими привилегиями, чем необходимо.
    • Все данные хранятся только в одном бакете, но в разных папках.
    • Группы безопасности не настраиваются.
Высокий уровень безопасности
Упрощенная настройка

Подготовьте инфраструктуру:

  1. Создайте сервисные аккаунты со следующими ролями:

    Сервисный аккаунт

    Его роли

    airflow-agent для кластера Apache Airflow™.

    • managed-airflow.integrationProvider — чтобы кластер Apache Airflow™ мог взаимодействовать с другими ресурсами.
    • managed-spark.editor — чтобы управлять кластером Apache Spark™ из DAG.
    • iam.serviceAccounts.user — чтобы указать сервисный аккаунт spark-agent при создании кластера Apache Spark™.
    • vpc.user — чтобы в кластере Apache Airflow™ использовать подсеть Yandex Virtual Private Cloud.
    • logging.editor — чтобы работать с лог-группами.
    • logging.reader — чтобы читать логи.
    • mdb.viewer — чтобы получать статусы операций.
    • managed-metastore.viewer — чтобы просматривать информацию о кластерах Hive Metastore.

    metastore-agent для кластера Metastore.

    • managed-metastore.integrationProvider — чтобы кластер Metastore мог взаимодействовать с другими ресурсами.

    spark-agent для кластера Apache Spark™.

    • managed-spark.integrationProvider — чтобы кластер Apache Spark™ мог взаимодействовать с другими ресурсами.
  2. Создайте бакеты:

    • <бакет_для_исходного_кода_Airflow_DAG>.
    • <бакет_для_исходного_кода_PySpark_задания>.
    • <бакет_для_выходных_данных_PySpark_задания>.

    Бакетов нужно несколько, так как на них назначаются различные права доступа.

  3. Предоставьте разрешения на следующие бакеты:

    • <бакет_для_исходного_кода_Airflow_DAG> — разрешение READ для сервисного аккаунта airflow-agent.
    • <бакет_для_исходного_кода_PySpark_задания> — разрешение READ для сервисного аккаунта spark-agent.
    • <бакет_для_выходных_данных_PySpark_задания> — разрешение READ и WRITE для сервисных аккаунтов spark-agent и metastore-agent.
  4. Создайте облачную сеть с именем datalake-network.

    Вместе с ней автоматически создадутся три подсети в разных зонах доступности.

  5. Для кластера Metastore создайте группу безопасности metastore-sg в сети datalake-network. Добавьте в группу следующие правила:

    • Для входящего трафика от клиентов:

      • Диапазон портов — 30000-32767.
      • Протокол — Любой (Any).
      • Источник — CIDR.
      • CIDR блоки — 0.0.0.0/0.
    • Для входящего трафика от балансировщика:

      • Диапазон портов — 10256.
      • Протокол — Любой (Any).
      • Источник — Проверки состояния балансировщика.
  6. Для кластера Apache Airflow™ создайте группу безопасности airflow-sg в сети datalake-network. Добавьте в группу следующее правило:

    • Для исходящего HTTPS-трафика:

      • Диапазон портов — 443.
      • Протокол — TCP.
      • Назначение — CIDR.
      • CIDR блоки — 0.0.0.0/0.
  7. Для кластера Apache Spark™ создайте группу безопасности spark-sg в сети datalake-network. Добавьте в группу следующее правило:

    • Для исходящего трафика, чтобы разрешить подключение кластера Apache Spark™ к Metastore:

      • Диапазон портов — 9083.
      • Протокол — Любой (Any).
      • Назначение — CIDR.
      • CIDR блоки — 0.0.0.0/0.
  8. Создайте кластер Metastore с параметрами:

    • Сервисный аккаунт — metastore-agent.
    • Сеть — datalake-network.
    • Подсеть — datalake-network-ru-central1-a.
    • Группа безопасности — metastore-sg.

    Примечание

    Дождитесь завершения операции.

  9. Создайте кластер Managed Service for Apache Airflow™ с параметрами:

    • Сервисный аккаунт — airflow-agent.
    • Зона доступности — ru-central1-a.
    • Сеть — datalake-network.
    • Подсеть — datalake-network-ru-central1-a.
    • Группа безопасности — airflow-sg.
    • Имя бакета — <бакет_для_исходного_кода_Airflow_DAG>.

Подготовьте инфраструктуру:

  1. Создайте сервисный аккаунт integration-agent со следующими ролями:

    • managed-airflow.integrationProvider — чтобы кластер Apache Airflow™ мог взаимодействовать с другими ресурсами.
    • managed-spark.editor — чтобы управлять кластером Apache Spark™ из DAG.
    • iam.serviceAccounts.user — чтобы указать сервисный аккаунт spark-agent при создании кластера Apache Spark™.
    • vpc.user — чтобы в кластере Apache Airflow™ использовать подсеть Yandex Virtual Private Cloud.
    • logging.editor — чтобы работать с лог-группами.
    • logging.reader — чтобы читать логи.
    • mdb.viewer — чтобы получать статусы операций.
    • managed-metastore.viewer — чтобы просматривать информацию о кластерах Hive Metastore.
    • managed-metastore.integrationProvider — чтобы кластер Metastore мог взаимодействовать с другими ресурсами.
    • managed-spark.integrationProvider — чтобы кластер Apache Spark™ мог взаимодействовать с другими ресурсами.
  2. Создайте бакет <бакет_для_заданий_и_данных> и предоставьте разрешение READ и WRITE для сервисного аккаунта integration-agent.

  3. Создайте облачную сеть с именем datalake-network.

    Вместе с ней автоматически создадутся три подсети в разных зонах доступности и группа безопасности.

  4. Создайте кластер Metastore с параметрами:

    • Сервисный аккаунт — integration-agent.
    • Сеть — datalake-network.
    • Подсеть — datalake-network-ru-central1-a.
    • Группа безопасности — группа по умолчанию в сети datalake-network.

    Примечание

    Дождитесь завершения операции.

  5. Создайте кластер Managed Service for Apache Airflow™ с параметрами:

    • Сервисный аккаунт — integration-agent.
    • Зона доступности — ru-central1-a.
    • Сеть — datalake-network.
    • Подсеть — datalake-network-ru-central1-a.
    • Группа безопасности — группа по умолчанию в сети datalake-network.
    • Имя бакета — <бакет_для_заданий_и_данных>.

Подготовьте PySpark-заданиеПодготовьте PySpark-задание

Для PySpark-задания будет использован Python-скрипт, который создает таблицу и хранится в бакете Object Storage. Подготовьте файл скрипта:

Высокий уровень безопасности
Упрощенная настройка
  1. Создайте локально файл с именем job_with_table.py и скопируйте в него скрипт:

    import random
    from pyspark.sql import SparkSession
    
    
    def prepare_table(spark, database, table):
        create_database_sql = "create database if not exists {database}"
        create_table_sql = """
        create table if not exists {database}.{table} (
            id int,
            value double
        )
        using iceberg
        """
        truncate_table_sql = "truncate table {database}.{table}"
    
        spark.sql(create_database_sql.format(database=database))
        spark.sql(create_table_sql.format(database=database, table=table))
        spark.sql(truncate_table_sql.format(database=database, table=table))
    
    
    def write_data(spark, database, table):
        data = [(i, random.random()) for i in range(100_000)]
        # Создание датафрейма
        df = spark.createDataFrame(data, schema=['id', 'value'])
        table_full_name = "{database}.{table}".format(database=database, table=table)
        df.writeTo(table_full_name).append()
    
    
    def main():
        # Создание Spark-сессии
        spark = (
            SparkSession
            .builder
            .appName('job_with_table')
            .enableHiveSupport()
            .getOrCreate()
        )
        database, table = 'database_1', 'table_1'
        prepare_table(spark, database, table)
        write_data(spark, database, table)
    
    
    if __name__ == '__main__':
        main()
    
    

  2. Создайте в бакете <бакет_для_исходного_кода_PySpark_задания> папку scripts и загрузите в нее файл job_with_table.py.

  1. Создайте локально файл с именем job_with_table.py и скопируйте в него скрипт:

    import random
    from pyspark.sql import SparkSession
    
    
    def prepare_table(spark, database, table):
        create_database_sql = "create database if not exists {database}"
        create_table_sql = """
        create table if not exists {database}.{table} (
            id int,
            value double
        )
        using iceberg
        """
        truncate_table_sql = "truncate table {database}.{table}"
    
        spark.sql(create_database_sql.format(database=database))
        spark.sql(create_table_sql.format(database=database, table=table))
        spark.sql(truncate_table_sql.format(database=database, table=table))
    
    
    def write_data(spark, database, table):
        data = [(i, random.random()) for i in range(100_000)]
        # Создание датафрейма
        df = spark.createDataFrame(data, schema=['id', 'value'])
        table_full_name = "{database}.{table}".format(database=database, table=table)
        df.writeTo(table_full_name).append()
    
    
    def main():
        # Создание Spark-сессии
        spark = (
            SparkSession
            .builder
            .appName('job_with_table')
            .enableHiveSupport()
            .getOrCreate()
        )
        database, table = 'database_1', 'table_1'
        prepare_table(spark, database, table)
        write_data(spark, database, table)
    
    
    if __name__ == '__main__':
        main()
    
    

  2. Создайте в бакете <бакет_для_заданий_и_данных> папку scripts и загрузите в нее файл job_with_table.py.

Подготовьте и запустите DAG-файлПодготовьте и запустите DAG-файл

DAG будет состоять из нескольких вершин, которые формируют цепочку последовательных действий:

  1. Yandex Managed Service for Apache Airflow™ создает временный кластер Apache Spark™ с настройками, заданными в DAG. Этот кластер автоматически подключается к созданному ранее кластеру Metastore.
  2. Когда кластер Apache Spark™ готов, запускается задание PySpark.
  3. После выполнения задания временный кластер Apache Spark™ удаляется.

Чтобы подготовить DAG:

Высокий уровень безопасности
Упрощенная настройка
  1. Создайте локально файл с именем dag.py, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:

    import logging
    import pendulum
    from airflow.models.dag import DAG
    from airflow.decorators import task
    from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook
    
    from yandexcloud.operations import OperationError
    
    
    YANDEX_CONN_ID = '<идентификатор_подключения>'
    
    # Данные вашей инфраструктуры
    FOLDER_ID = '<идентификатор_каталога>'
    SERVICE_ACCOUNT_ID = '<идентификатор_сервисного_аккаунта_spark-agent>'
    SUBNET_IDS = [<идентификатор_подсети>]
    SECURITY_GROUP_IDS = [<идентификатор_группы_безопасности>]
    METASTORE_CLUSTER_ID = '<идентификатор_кластера_Metastore>'
    
    JOB_NAME = 'job_with_table'
    JOB_SCRIPT = 's3a://<бакет_для_исходного_кода_PySpark_задания>/scripts/job_with_table.py'
    JOB_ARGS = []
    JOB_PROPERTIES = {
        'spark.executor.instances': '1',
        'spark.sql.warehouse.dir': 's3a://<бакет_для_выходных_данных_PySpark_задания>/warehouse',
    }
    
    
    @task
    # 1 этап: создание кластера Apache Spark™
    def create_cluster(yc_hook, cluster_spec):
        spark_client = yc_hook.sdk.wrappers.Spark()
        try:
            spark_client.create_cluster(cluster_spec)
        except OperationError as job_error:
            cluster_id = job_error.operation_result.meta.cluster_id
            if cluster_id:
                spark_client.delete_cluster(cluster_id=cluster_id)
            raise
        return spark_client.cluster_id
    
    
    @task
    # 2 этап: запуск задания PySpark
    def run_spark_job(yc_hook, cluster_id, job_spec):
        spark_client = yc_hook.sdk.wrappers.Spark()
        try:
            job_operation = spark_client.create_pyspark_job(cluster_id=cluster_id, spec=job_spec)
            job_id = job_operation.response.id
            job_info = job_operation.response
        except OperationError as job_error:
            job_id = job_error.operation_result.meta.job_id
            job_info, _ = spark_client.get_job(cluster_id=cluster_id, job_id=job_id)
            raise
        finally:
            job_log = spark_client.get_job_log(cluster_id=cluster_id, job_id=job_id)
            for line in job_log:
                logging.info(line)
            logging.info("Job info: %s", job_info)
    
    
    @task(trigger_rule="all_done")
    # 3 этап: удаление кластера Apache Spark™
    def delete_cluster(yc_hook, cluster_id):
        if cluster_id:
            spark_client = yc_hook.sdk.wrappers.Spark()
            spark_client.delete_cluster(cluster_id=cluster_id)
    
    
    # Настройки DAG
    with DAG(
        dag_id="example_spark",
        start_date=pendulum.datetime(2025, 1, 1),
        schedule=None,
    ):
        yc_hook = YandexCloudBaseHook(yandex_conn_id=YANDEX_CONN_ID)
    
        cluster_spec = yc_hook.sdk.wrappers.SparkClusterParameters(
            folder_id=FOLDER_ID,
            service_account_id=SERVICE_ACCOUNT_ID,
            subnet_ids=SUBNET_IDS,
            security_group_ids=SECURITY_GROUP_IDS,
            driver_pool_resource_preset="c2-m8",
            driver_pool_size=1,
            executor_pool_resource_preset="c4-m16",
            executor_pool_min_size=1,
            executor_pool_max_size=2,
            metastore_cluster_id=METASTORE_CLUSTER_ID,
        )
        cluster_id = create_cluster(yc_hook, cluster_spec)
    
        job_spec = yc_hook.sdk.wrappers.PysparkJobParameters(
            name=JOB_NAME,
            main_python_file_uri=JOB_SCRIPT,
            args=JOB_ARGS,
            properties=JOB_PROPERTIES,
        )
        task_job = run_spark_job(yc_hook, cluster_id, job_spec)
        task_delete = delete_cluster(yc_hook, cluster_id)
    
        task_job >> task_delete
    

    Где:

    • YANDEX_CONN_ID — идентификатор подключения.

    • FOLDER_ID — идентификатор каталога, в котором будет создан кластер Apache Spark™.

    • SERVICE_ACCOUNT_ID — идентификатор сервисного аккаунта, который будет использоваться для создания кластера Apache Spark™.

    • SUBNET_IDS — идентификатор подсети.

      Примечание

      Подсеть для Apache Spark™ и Metastore должна совпадать.

    • SECURITY_GROUP_IDS — идентификатор группы безопасности для кластера Apache Spark™.

    • METASTORE_CLUSTER_ID — идентификатор кластера Metastore.

    • JOB_NAME — имя задания PySpark.

    • JOB_SCRIPT — путь к файлу с заданием PySpark.

    • JOB_ARGS — аргументы задания PySpark.

    • JOB_PROPERTIES — свойства задания PySpark.

  2. Загрузите DAG в кластер Apache Airflow™: создайте в бакете <бакет_для_исходного_кода_Airflow_DAG> папку dags и загрузите в нее файл dag.py.

  3. Откройте веб-интерфейс Apache Airflow™.

  4. Убедитесь, что в разделе DAGs появился новый DAG example_spark.

    Загрузка DAG-файла из бакета может занять несколько минут.

  5. Чтобы запустить DAG, в строке с его именем нажмите кнопку image.

  1. Создайте локально файл с именем dag.py, скопируйте в него скрипт и подставьте данные вашей инфраструктуры в переменные:

    import logging
    import pendulum
    from airflow.models.dag import DAG
    from airflow.decorators import task
    from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook
    
    from yandexcloud.operations import OperationError
    
    
    YANDEX_CONN_ID = '<идентификатор_подключения>'
    
    # Данные вашей инфраструктуры
    FOLDER_ID = '<идентификатор_каталога>'
    SERVICE_ACCOUNT_ID = '<идентификатор_сервисного_аккаунта_integration-agent>'
    SUBNET_IDS = [<идентификатор_подсети>]
    SECURITY_GROUP_IDS = [<идентификатор_группы_безопасности>]
    METASTORE_CLUSTER_ID = '<идентификатор_кластера_Metastore>'
    
    JOB_NAME = 'job_with_table'
    JOB_SCRIPT = 's3a://<бакет_для_заданий_и_данных>/scripts/job_with_table.py'
    JOB_ARGS = []
    JOB_PROPERTIES = {
        'spark.executor.instances': '1',
        'spark.sql.warehouse.dir': 's3a://<бакет_для_заданий_и_данных>/warehouse',
    }
    
    
    @task
    # 1 этап: создание кластера Apache Spark™
    def create_cluster(yc_hook, cluster_spec):
        spark_client = yc_hook.sdk.wrappers.Spark()
        try:
            spark_client.create_cluster(cluster_spec)
        except OperationError as job_error:
            cluster_id = job_error.operation_result.meta.cluster_id
            if cluster_id:
                spark_client.delete_cluster(cluster_id=cluster_id)
            raise
        return spark_client.cluster_id
    
    
    @task
    # 2 этап: запуск задания PySpark
    def run_spark_job(yc_hook, cluster_id, job_spec):
        spark_client = yc_hook.sdk.wrappers.Spark()
        try:
            job_operation = spark_client.create_pyspark_job(cluster_id=cluster_id, spec=job_spec)
            job_id = job_operation.response.id
            job_info = job_operation.response
        except OperationError as job_error:
            job_id = job_error.operation_result.meta.job_id
            job_info, _ = spark_client.get_job(cluster_id=cluster_id, job_id=job_id)
            raise
        finally:
            job_log = spark_client.get_job_log(cluster_id=cluster_id, job_id=job_id)
            for line in job_log:
                logging.info(line)
            logging.info("Job info: %s", job_info)
    
    
    @task(trigger_rule="all_done")
    # 3 этап: удаление кластера Apache Spark™
    def delete_cluster(yc_hook, cluster_id):
        if cluster_id:
            spark_client = yc_hook.sdk.wrappers.Spark()
            spark_client.delete_cluster(cluster_id=cluster_id)
    
    
    # Настройки DAG
    with DAG(
        dag_id="example_spark",
        start_date=pendulum.datetime(2025, 1, 1),
        schedule=None,
    ):
        yc_hook = YandexCloudBaseHook(yandex_conn_id=YANDEX_CONN_ID)
    
        cluster_spec = yc_hook.sdk.wrappers.SparkClusterParameters(
            folder_id=FOLDER_ID,
            service_account_id=SERVICE_ACCOUNT_ID,
            subnet_ids=SUBNET_IDS,
            security_group_ids=SECURITY_GROUP_IDS,
            driver_pool_resource_preset="c2-m8",
            driver_pool_size=1,
            executor_pool_resource_preset="c4-m16",
            executor_pool_min_size=1,
            executor_pool_max_size=2,
            metastore_cluster_id=METASTORE_CLUSTER_ID,
        )
        cluster_id = create_cluster(yc_hook, cluster_spec)
    
        job_spec = yc_hook.sdk.wrappers.PysparkJobParameters(
            name=JOB_NAME,
            main_python_file_uri=JOB_SCRIPT,
            args=JOB_ARGS,
            properties=JOB_PROPERTIES,
        )
        task_job = run_spark_job(yc_hook, cluster_id, job_spec)
        task_delete = delete_cluster(yc_hook, cluster_id)
    
        task_job >> task_delete
    

    Где:

    • YANDEX_CONN_ID — идентификатор подключения.

    • FOLDER_ID — идентификатор каталога, в котором будет создан кластер Apache Spark™.

    • SERVICE_ACCOUNT_ID — идентификатор сервисного аккаунта, который будет использоваться для создания кластера Apache Spark™.

    • SUBNET_IDS — идентификатор подсети.

      Примечание

      Подсеть для Apache Spark™ и Metastore должна совпадать.

    • SECURITY_GROUP_IDS — идентификатор группы безопасности для кластера Apache Spark™.

    • METASTORE_CLUSTER_ID — идентификатор кластера Metastore.

    • JOB_NAME — имя задания PySpark.

    • JOB_SCRIPT — путь к файлу с заданием PySpark.

    • JOB_ARGS — аргументы задания PySpark.

    • JOB_PROPERTIES — свойства задания PySpark.

  2. Загрузите DAG в кластер Apache Airflow™: создайте в бакете <бакет_для_заданий_и_данных> папку dags и загрузите в нее файл dag.py.

  3. Откройте веб-интерфейс Apache Airflow™.

  4. Убедитесь, что в разделе DAGs появился новый DAG example_spark.

    Загрузка DAG-файла из бакета может занять несколько минут.

  5. Чтобы запустить DAG, в строке с его именем нажмите кнопку image.

Проверьте результатПроверьте результат

Высокий уровень безопасности
Упрощенная настройка
  1. Чтобы отслеживать результаты выполнения задач, нажмите на название DAG.
  2. Дождитесь, когда все три задачи в DAG перейдут в статус Success. Параллельно вы можете проверить, что в консоли управления создается кластер Apache Spark™, выполняется задание PySpark и удаляется тот же кластер.
  3. Убедитесь, что в бакете <бакет_для_выходных_данных_PySpark_задания> появилась БД database_1. Теперь данные из созданной БД хранятся в бакете Object Storage, а метаинформация о ней — в кластере Metastore.
  1. Чтобы отслеживать результаты выполнения задач, нажмите на название DAG.
  2. Дождитесь, когда все три задачи в DAG перейдут в статус Success. Параллельно вы можете проверить, что в консоли управления создается кластер Apache Spark™, выполняется задание PySpark и удаляется тот же кластер.
  3. Убедитесь, что в бакете <бакет_для_заданий_и_данных> появилась БД database_1. Теперь данные из созданной БД хранятся в бакете Object Storage, а метаинформация о ней — в кластере Metastore.

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:

Высокий уровень безопасности
Упрощенная настройка
  1. Сервисные аккаунты.
  2. Бакеты Object Storage.
  3. Кластер Metastore.
  4. Кластер Apache Airflow™.
  5. Группы безопасности.
  6. Облачные подсети, созданные по умолчанию в сети datalake-network.
  7. Облачную сеть.
  1. Сервисный аккаунт.
  2. Бакет Object Storage.
  3. Кластер Metastore.
  4. Кластер Apache Airflow™.
  5. Группу безопасности, созданную по умолчанию в сети datalake-network.
  6. Облачные подсети, созданные по умолчанию в сети datalake-network.
  7. Облачную сеть.

Была ли статья полезна?

Предыдущая
Yandex Query: автоматизация задач
Следующая
Настройка SMTP-сервера для отправки уведомлений по электронной почте
Проект Яндекса
© 2025 ООО «Яндекс.Облако»