Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Истории успеха
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • ИИ для бизнеса
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Калькулятор цен
    • Тарифы
    • Промоакции и free tier
    • Правила тарификации
  • Истории успеха
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Managed Service for Apache Spark™
  • Начало работы
    • Все руководства
    • Автоматизация работы с помощью Yandex Managed Service for Apache Airflow™
    • Работа с таблицей в Object Storage из PySpark-задания
    • Интеграция Yandex Managed Service for Apache Spark™ и Apache Hive™ Metastore
    • Запуск PySpark-задания с помощью Yandex Managed Service for Apache Airflow™
    • Использование Yandex Object Storage в Managed Service for Apache Spark™
  • Управление доступом
  • Правила тарификации
  • Метрики Yandex Monitoring
  • Справочник Terraform
  • История изменений

В этой статье:

  • Необходимые платные ресурсы
  • Подготовьте инфраструктуру
  • Подготовьте и запустите PySpark-задание
  • Проверьте результат
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Работа с таблицей в Object Storage из PySpark-задания

Работа с таблицей в Object Storage из PySpark-задания с использованием Apache Hive™ Metastore и Apache Iceberg™

Статья создана
Yandex Cloud
Обновлена 13 ноября 2025 г.
  • Необходимые платные ресурсы
  • Подготовьте инфраструктуру
  • Подготовьте и запустите PySpark-задание
  • Проверьте результат
  • Удалите созданные ресурсы

Интеграция Yandex Managed Service for Apache Spark™ с Apache Hive™ Metastore и Apache Iceberg™ дает ряд возможностей и преимуществ при работе с данными в Object Storage через SQL-таблицы.

Apache Hive™ Metastore обеспечивает:

  • Централизованное хранение метаданных о базах, таблицах и партициях.
  • Упрощенный доступ к данным без указания путей и схем вручную.
  • Хранение статистик таблиц и колонок для оптимизации запросов.

Apache Iceberg™ обеспечивает:

  • Версионирование данных и хранение снимков состояний.
  • ACID-транзакции, которые поддерживают операции UPDATE, DELETE, MERGE, а также эволюцию таблиц и способа партиционирования.
  • Масштабируемость с сохранением высокой производительности операций.

В этом руководстве показано, как использовать следующие возможности Apache Hive™ Metastore и Apache Iceberg™ при работе с S3-хранилищем из PySpark-задания:

  • Обращение к таблице по имени.

    Apache Hive™ Metastore использует глобальный каталог метаданных для всех кластеров. Сохраненные метаданные затем может использовать любое приложение из любого кластера Apache Spark™, подключенного к этому кластеру Apache Hive™ Metastore.

  • Создание и чтение снимков метаданных.

    Apache Iceberg™ фиксирует каждую запись в таблицу как новый снимок метаданных. В дальнейшем к этим снимкам можно обращаться, указав момент времени или идентификатор снимка.

Чтобы реализовать описанный пример:

  1. Подготовьте инфраструктуру.
  2. Подготовьте и запустите PySpark-задание.
  3. Проверьте результат.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за бакеты Object Storage: хранение данных и выполнение операций с ними (см. тарифы Object Storage).
  • Плата за получение и хранение логов (см. тарифы Cloud Logging).
  • Плата за вычислительные ресурсы компонентов кластера Yandex Managed Service for Apache Spark™ (см. тарифы Yandex Managed Service for Apache Spark™).
  • Плата за вычислительные ресурсы компонентов кластера Apache Hive™ Metastore (см. тарифы Yandex MetaData Hub).

Подготовьте инфраструктуруПодготовьте инфраструктуру

Подготовьте инфраструктуру:

  1. Создайте сервисный аккаунт spark-agent и выдайте ему роль managed-spark.integrationProvider.

  2. Создайте сервисный аккаунт metastore-agent и выдайте ему роль managed-metastore.integrationProvider, чтобы кластер Apache Hive™ Metastore мог взаимодействовать с другими ресурсами.

  3. Создайте бакеты:

    • бакет для исходного кода PySpark-задания;
    • бакет для выходных данных.
  4. Предоставьте разрешения для сервисного аккаунта spark-agent на созданные бакеты:

    • бакет для исходного кода PySpark-задания — READ;
    • бакет для выходных данных — READ и WRITE.
  5. Предоставьте разрешение READ и WRITE для сервисного аккаунта metastore-agent на бакет для выходных данных.

  6. Создайте облачную сеть с именем integration-network.

    Вместе с ней автоматически создадутся три подсети в разных зонах доступности.

  7. Для кластера Apache Spark™ создайте группу безопасности spark-sg в сети integration-network. Добавьте в группу следующее правило:

    • Для исходящего трафика, чтобы разрешить подключение кластера Apache Spark™ к Apache Hive™ Metastore:

      • Диапазон портов — 9083.
      • Протокол — Любой (Any).
      • Назначение — CIDR.
      • CIDR блоки — 0.0.0.0/0.
  8. Для кластера Apache Hive™ Metastore создайте группу безопасности metastore-sg в сети integration-network. Добавьте в группу следующие правила:

    • Для входящего трафика от клиентов:

      • Диапазон портов — 30000-32767.
      • Протокол — Любой (Any).
      • Источник — CIDR.
      • CIDR блоки — 0.0.0.0/0.
    • Для входящего трафика от балансировщика:

      • Диапазон портов — 10256.
      • Протокол — Любой (Any).
      • Источник — Проверки состояния балансировщика.
  9. Создайте кластер Apache Hive™ Metastore с параметрами:

    • Сервисный аккаунт — metastore-agent.
    • Версия — 3.1.
    • Сеть — integration-network.
    • Подсеть — integration-network-ru-central1-a.
    • Группы безопасности — metastore-sg.
  10. Создайте кластер Apache Spark™ с параметрами:

    • Сервисный аккаунт — spark-agent.
    • Сеть — integration-network.
    • Подсеть — integration-network-ru-central1-a.
    • Группы безопасности — spark-sg.
    • Metastore-сервер — созданный ранее кластер Apache Hive™ Metastore.

Подготовьте и запустите PySpark-заданиеПодготовьте и запустите PySpark-задание

Для PySpark-задания будет использован Python-скрипт, который:

  1. Создает базу данных и таблицу формата Apache Iceberg™ в бакете.
  2. Записывает 10 строк с данными в таблицу.
  3. Сохраняет идентификатор текущего снимка таблицы.
  4. Записывает еще 10 строк с данными в таблицу.
  5. Выводит количество строк в текущем состоянии таблицы.
  6. Выводит количество строк в состоянии таблицы на момент снимка.

Подготовьте файл скрипта:

  1. Создайте файл ice_min_demo.py и скопируйте в него следующий код:

    ice_min_demo.py
    import random
    from pyspark.sql import SparkSession
    
    spark = (
       SparkSession.builder
       .appName("ice_min_demo")
       .enableHiveSupport()
       .getOrCreate()
    )
    
    # Создание базы данных и таблицы в формате Apache Iceberg™.
    # Apache Hive™ Metastore фиксирует метаданные, что позволяет обращаться к таблице по имени db.tbl
    # из любых приложений Spark, подключённых к этому кластеру Apache Hive™ Metastore.
    db, tbl = "demo_db", "demo_events"
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {db}.{tbl} (
       id BIGINT,
       value DOUBLE
    ) USING iceberg
    """)
    
    # Запись первого набора данных в таблицу
    df1 = spark.createDataFrame([(i, random.random()) for i in range(10)], ["id","value"])
    df1.writeTo(f"{db}.{tbl}").append()
    
    # Фиксация идентификатора текущего снимка из служебной таблицы .snapshots
    snap_before = spark.sql(f"SELECT max(snapshot_id) AS s FROM {db}.{tbl}.snapshots").collect()[0][0]
    
    # Запись второго набора данных в таблицу
    df2 = spark.createDataFrame([(i, random.random()) for i in range(10, 20)], ["id","value"])
    df2.writeTo(f"{db}.{tbl}").append()
    
    # Подсчет и вывод числа строк в текущем (20) и предыдущем состоянии таблицы (10)
    cnt_now = spark.table(f"{db}.{tbl}").count()
    cnt_past = spark.sql(f"SELECT COUNT(*) FROM {db}.{tbl} VERSION AS OF {snap_before}").collect()[0][0]
    print(f"now_count: {cnt_now} | past_count: {cnt_past}", flush=True)
    
    spark.stop()
    
  2. В бакете для исходного кода создайте папку scripts и загрузите в нее файл ice_min_demo.py.

  3. Создайте задание с параметрами:

    • Тип задания: PySpark.
    • Main python файл: s3a://<бакет_для_исходного_кода>/scripts/ice_min_demo.py.
    • Настройки: spark.sql.warehouse.dir – s3a://<бакет_для_выходных_данных>/warehouse/.

Проверьте результатПроверьте результат

  1. Дождитесь, когда созданное PySpark-задание перейдет в статус Done.
  2. Откройте логи выполнения задания.
  3. Найдите в логах строку now_count: 20 | past_count: 10.
  4. Убедитесь, что в бакете для выходных данных появилась папка warehouse/demo_db. Теперь данные из созданной БД demo_db хранятся в бакете Object Storage, а метаинформация о ней — в кластере Apache Hive™ Metastore.

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:

  1. Бакеты Object Storage. Перед удалением бакетов удалите из них все объекты.
  2. Кластер Apache Hive™ Metastore.
  3. Кластер Apache Spark™.

Была ли статья полезна?

Предыдущая
Автоматизация работы с помощью Yandex Managed Service for Apache Airflow™
Следующая
Интеграция Yandex Managed Service for Apache Spark™ и Apache Hive™ Metastore
Проект Яндекса
© 2025 ООО «Яндекс.Облако»