Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Data Processing
  • Начало работы
    • Все руководства
        • Использование Object Storage в Yandex Data Processing
        • Импорт данных из Object Storage, обработка и экспорт в Managed Service for ClickHouse®
        • Монтирование бакетов к файловой системе хостов Yandex Data Processing
      • Обмен данными с Managed Service for ClickHouse®
      • Импорт данных из кластера Managed Service for MySQL® с помощью Sqoop
      • Импорт данных из кластера Managed Service for PostgreSQL с помощью Sqoop
      • Интеграция с сервисом DataSphere
      • Работа с топиками Apache Kafka® с помощью PySpark-заданий
      • Автоматизация работы с помощью Managed Service for Apache Airflow™
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • Вопросы и ответы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте тестовые данные
  • Обработайте данные в Yandex Data Processing
  • Экспортируйте данные в ClickHouse®
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Интеграция Yandex Data Processing с другими сервисами
  3. Object Storage
  4. Импорт данных из Object Storage, обработка и экспорт в Managed Service for ClickHouse®

Импорт данных из Yandex Object Storage, обработка и экспорт в Yandex Managed Service for ClickHouse®

Статья создана
Yandex Cloud
Обновлена 8 апреля 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте тестовые данные
  • Обработайте данные в Yandex Data Processing
  • Экспортируйте данные в ClickHouse®
  • Удалите созданные ресурсы

Руководство основано на сценарии компании Data Stories по построению аналитического стека на базе сервисов Yandex Cloud. Сценарий включал загрузку данных в хранилище, их обработку и трансформацию в единую витрину для визуализации.


datastories logo

В качестве примера используются две CSV-таблицы, которые нужно объединить в одну, импортировать в формат Parquet и передать в Managed Service for ClickHouse®.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for ClickHouse®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Managed Service for ClickHouse®).
  • Плата за кластер Yandex Data Processing: использование вычислительных ресурсов ВМ и сетевых дисков Compute Cloud, а также сервиса Cloud Logging для работы с логами (см. тарифы Yandex Data Processing).
  • Плата за использование публичных IP-адресов для хостов кластера (см. тарифы Virtual Private Cloud).
  • Плата за бакеты Object Storage: хранение данных и выполнение операций с ними (см. тарифы Object Storage).
  • Плата за NAT-шлюз (см. тарифы Virtual Private Cloud).

Перед началом работыПеред началом работы

Подготовьте инфраструктуру:

Вручную
Terraform
  1. Создайте сервисный аккаунт с именем dataproc-s3-sa и назначьте ему роли dataproc.agent и dataproc.provisioner.

  2. В Object Storage создайте бакеты и настройте доступ к ним:

    1. Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение READ для этого бакета.
    2. Создайте бакет для результатов обработки и предоставьте сервисному аккаунту кластера разрешение READ и WRITE для этого бакета.
  3. Создайте облачную сеть с именем dataproc-network.

  4. В сети dataproc-network создайте подсеть в любой зоне доступности.

  5. Настройте NAT-шлюз для созданной подсети.

  6. В сети dataproc-network создайте группу безопасности с именем dataproc-sg и добавьте в нее следующие правила:

    • По одному правилу для входящего и исходящего служебного трафика:

      • Диапазон портов — 0-65535.
      • Протокол — Любой (Any).
      • Источник / Назначение — Группа безопасности.
      • Группа безопасности — Текущая (Self).
    • Правило для исходящего HTTPS-трафика:

      • Диапазон портов — 443.
      • Протокол — TCP.
      • Назначение — CIDR.
      • CIDR блоки — 0.0.0.0/0.
    • Правило для исходящего трафика по протоколу TCP на порт 8443 для доступа к ClickHouse®:

      • Диапазон портов — 8443.
      • Протокол — TCP.
      • Назначение — CIDR.
      • CIDR блоки — 0.0.0.0/0.
  7. Создайте кластер Yandex Data Processing с любой подходящей конфигурацией хостов и следующими настройками:

    • Окружение — PRODUCTION.
    • Сервисы:
      • SPARK;
      • YARN;
      • HDFS.
    • Сервисный аккаунт — dataproc-sa.
    • Имя бакета — бакет, который вы создали для выходных данных.
    • Сеть — dataproc-network.
    • Группы безопасности — dataproc-sg.
    • Настройка UI Proxy включена.
  8. Создайте кластер Managed Service for ClickHouse® любой подходящей конфигурации со следующими настройками:

    • С публичным доступом к хостам кластера.
    • Имя БД — db1.
    • Имя пользователя — user1.
  1. Если у вас еще нет Terraform, установите его.

  2. Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.

  3. Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его.

  4. Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.

  5. Скачайте в ту же рабочую директорию файл конфигурации s3-dataproc-ch.tf.

    В этом файле описаны:

    • сеть;
    • подсеть;
    • NAT-шлюз и таблица маршрутизации, необходимые для работы Yandex Data Processing;
    • группы безопасности, необходимые для кластеров Yandex Data Processing и Managed Service for ClickHouse®;
    • сервисный аккаунт, необходимый для работы кластера Yandex Data Processing;
    • сервисный аккаунт, необходимый для создания бакетов в Object Storage;
    • бакеты для входных и выходных данных;
    • кластер Yandex Data Processing;
    • кластер Managed Service for ClickHouse®.
  6. Укажите в файле s3-dataproc-ch.tf:

    • folder_id — идентификатор облачного каталога, такой же как в настройках провайдера.
    • input-bucket — имя бакета для входных данных.
    • output-bucket — имя бакета для выходных данных.
    • dp_ssh_key — абсолютный путь к публичному ключу для кластера Yandex Data Processing. Подробнее см. в разделе SSH-подключение к хосту Yandex Data Processing.
    • ch_password — пароль пользователя ClickHouse®.
  7. Проверьте корректность файлов конфигурации Terraform с помощью команды:

    terraform validate
    

    Если в файлах конфигурации есть ошибки, Terraform на них укажет.

  8. Создайте необходимую инфраструктуру:

    1. Выполните команду для просмотра планируемых изменений:

      terraform plan
      

      Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

    2. Если вас устраивают планируемые изменения, внесите их:

      1. Выполните команду:

        terraform apply
        
      2. Подтвердите изменение ресурсов.

      3. Дождитесь завершения операции.

    В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.

Подготовьте тестовые данныеПодготовьте тестовые данные

Для примера используются две таблицы в формате CSV:

  • coords.csv — содержит информацию о географических координатах автомобиля.
  • sensors.csv — содержит информацию о скорости и рабочих параметрах автомобиля.

Чтобы подготовить тестовые данные к работе:

  1. Скопируйте содержимое файлов из примеров ниже и сохраните их локально в формате CSV:

    • coords.csv
      vehicle_id,latitude,longitude,altitude
      iv9a94th6rztooxh5ur2,55.70329032,37.65472196,427.5
      022wsiz48h2ljxuz04x8,56.96149325,38.46541766,423.6
      a7fbbqjws4zqw85f6jue,54.99296663,36.79063999,426.2
      l7731117m6r6on4m633n,55.34740545,37.13175678,422.5
      6f9q6ienc4qfpdwd9nef,56.69752218,38.38871530,428.3
      
    • sensors.csv
      vehicle_id,speed,battery_voltage,cabin_temperature,fuel_level
      iv9a94th6rztooxh5ur2,0.0,25.5,17,5
      022wsiz48h2ljxuz04x8,55.5,54.5,21,22
      a7fbbqjws4zqw85f6jue,80.6,22.1,19,73
      l7731117m6r6on4m633n,40.9,76.0,25,23
      6f9q6ienc4qfpdwd9nef,64.8,90.8,21,32
      
  2. Создайте в бакете для входных данных папку csv и загрузите в нее созданные CSV-файлы.

Обработайте данные в Yandex Data ProcessingОбработайте данные в Yandex Data Processing

Объедините данные из двух таблиц в одну и загрузите ее в формате Parquet в бакет, который вы ранее создали для результатов обработки:

  1. Подготовьте файл скрипта:

    1. Создайте локально файл с именем join-tables.py и скопируйте в него следующий скрипт:

      join-tables.py
      from pyspark.sql import SparkSession
      
      # Создание Spark-сессии
      spark = SparkSession.builder.appName("JoinExample").getOrCreate()
      
      # Чтение таблицы из файла coords.csv
      coords_df = spark.read.csv("s3a://<имя_входного_бакета>/csv/coords.csv", header=True)
      
      # Чтение таблицы из файла sensors.csv
      sensors_df = spark.read.csv("s3a://<имя_входного_бакета>/csv/sensors.csv", header=True)
      
      # Объединение таблицы по столбцу vehicle_id
      joined_df = coords_df.join(sensors_df, on="vehicle_id", how="inner")
      
      # Сохранение объединенной таблицы в бакет в формате Parquet
      joined_df.write.parquet("s3a://<имя_выходного_бакета>/parquet/")
      
    2. Укажите в скрипте:

      • Имя входного бакета, в котором хранятся исходные CSV-таблицы.
      • Имя выходного бакета, в который будет сохранен Parquet-файл с объединенными данными.
    3. Создайте в бакете для входных данных папку scripts и загрузите в нее файл join-tables.py.

  2. Создайте задание PySpark, указав в поле Main python файл путь к файлу скрипта: s3a://<имя_входного_бакета>/scripts/join-tables.py.

  3. Дождитесь завершения задания и проверьте, что в выходном бакете в папке parquet появился Parquet-файл part-00000-***.

Примечание

Вы можете просматривать логи выполнения заданий и искать в них информацию с помощью сервиса Yandex Cloud Logging. Подробнее см. в разделе Работа с логами.

Экспортируйте данные в ClickHouse®Экспортируйте данные в ClickHouse®

Перенесите объединенную таблицу из Object Storage в ClickHouse®:

  1. Подготовьте файл скрипта:

    1. Создайте локально файл с именем parquet-to-ch.py и скопируйте в него следующий скрипт:

      parquet-to-ch.py
      from pyspark.sql import SparkSession
      
      # Создание Spark-сессии
      spark = SparkSession.builder.appName("ParquetClickhouse").getOrCreate()
      
      # Чтение данных из Parquet-файла
      parquetFile = spark.read.parquet("s3a://<имя_выходного_бакета>/parquet/*.parquet")
      
      # Указание порта и параметров кластера ClickHouse®
      jdbcPort = 8443
      jdbcHostname = "c-<идентификатор_кластера>.rw.mdb.yandexcloud.net"
      jdbcDatabase = "db1"
      jdbcUrl = f"jdbc:clickhouse://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}?ssl=true"
      
      # Перенос таблицы из Parquet-файла в ClickHouse®-таблицу с именем measurements
      parquetFile.write.format("jdbc") \
      .mode("error") \
      .option("url", jdbcUrl) \
      .option("dbtable", "measurements") \
      .option("createTableOptions", "ENGINE = MergeTree() ORDER BY vehicle_id") \
      .option("user","user1") \
      .option("password","<пароль_пользователя_ClickHouse®>") \
      .save()
      
    2. Укажите в скрипте:

      • Имя бакета, в котором лежит Parquet-файл.
      • Идентификатор кластера Managed Service for ClickHouse®.
      • Пароль пользователя ClickHouse®.
    3. Загрузите файл parquet-to-ch.py в бакет для входных данных в папку scripts.

  2. Создайте задание PySpark, указав в поле Main python файл путь к файлу скрипта: s3a://<имя_входного_бакета>/scripts/parquet-to-ch.py.

  3. Дождитесь выполнения задания и убедитесь, что объединенная таблица перенесена в кластер:

    1. Подключитесь к базе данных db1 кластера Managed Service for ClickHouse® от имени пользователя user1.

    2. Выполните запрос:

      SELECT * FROM measurements;
      

    Если экспорт данных прошел успешно, ответом на запрос будет объединенная таблица.

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:

  1. Удалите объекты из бакетов.

  2. Остальные ресурсы удалите в зависимости от способа их создания:

    Вручную
    Terraform
    1. Кластер Managed Service for ClickHouse®.
    2. Кластер Yandex Data Processing.
    3. Бакеты Object Storage.
    4. Подсеть.
    5. Таблицу маршрутизации.
    6. NAT-шлюз.
    7. Облачную сеть.
    8. Сервисный аккаунт.
    1. В терминале перейдите в директорию с планом инфраструктуры.

      Важно

      Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.

    2. Удалите ресурсы:

      1. Выполните команду:

        terraform destroy
        
      2. Подтвердите удаление ресурсов и дождитесь завершения операции.

      Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.

ClickHouse® является зарегистрированным товарным знаком ClickHouse, Inc.

Была ли статья полезна?

Предыдущая
Использование Object Storage в Yandex Data Processing
Следующая
Монтирование бакетов к файловой системе хостов Yandex Data Processing
Проект Яндекса
© 2025 ООО «Яндекс.Облако»