Импорт данных из Yandex Object Storage, обработка и экспорт в Yandex Managed Service for ClickHouse®
Руководство основано на сценарии компании Data Stories |
|
В качестве примера используются две CSV-таблицы, которые нужно объединить в одну, импортировать в формат Parquet и передать в Managed Service for ClickHouse®.
Перед началом работы
Подготовьте инфраструктуру:
-
Создайте сервисный аккаунт с именем
dataproc-s3-sa
и назначьте ему ролиdataproc.agent
иdataproc.provisioner
. -
В Object Storage создайте бакеты и настройте доступ к ним:
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
READ
для этого бакета. - Создайте бакет для результатов обработки и предоставьте сервисному аккаунту кластера разрешение
READ и WRITE
для этого бакета.
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
-
Создайте облачную сеть с именем
dataproc-network
. -
В сети
dataproc-network
создайте подсеть в любой зоне доступности. -
Настройте NAT-шлюз для созданной подсети.
-
В сети
dataproc-network
создайте группу безопасности с именемdataproc-sg
и добавьте в нее следующие правила:-
По одному правилу для входящего и исходящего служебного трафика:
- Диапазон портов —
0-65535
. - Протокол —
Любой
(Any
). - Источник / Назначение —
Группа безопасности
. - Группа безопасности —
Текущая
(Self
).
- Диапазон портов —
-
Правило для исходящего HTTPS-трафика:
- Диапазон портов —
443
. - Протокол —
TCP
. - Назначение —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
Правило для исходящего трафика по протоколу TCP на порт 8443 для доступа к ClickHouse®:
- Диапазон портов —
8443
. - Протокол —
TCP
. - Назначение —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
-
Создайте кластер Yandex Data Processing с любой подходящей конфигурацией хостов и следующими настройками:
- Сервисы:
SPARK
;YARN
;HDFS
.
- Сервисный аккаунт —
dataproc-sa
. - Имя бакета — бакет, который вы создали для выходных данных.
- Сеть —
dataproc-network
. - Группы безопасности —
dataproc-sg
. - Настройка UI Proxy включена.
- Сервисы:
-
Создайте кластер Managed Service for ClickHouse® любой подходящей конфигурации со следующими настройками:
- С публичным доступом к хостам кластера.
- Имя БД —
db1
. - Имя пользователя —
user1
.
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации s3-dataproc-ch.tf
.В этом файле описаны:
- сеть;
- подсеть;
- NAT-шлюз и таблица маршрутизации, необходимые для работы Yandex Data Processing;
- группы безопасности, необходимые для кластеров Yandex Data Processing и Managed Service for ClickHouse®;
- сервисный аккаунт, необходимый для работы кластера Yandex Data Processing;
- сервисный аккаунт, необходимый для создания бакетов в Object Storage;
- бакеты для входных и выходных данных;
- кластер Yandex Data Processing;
- кластер Managed Service for ClickHouse®.
-
Укажите в файле
s3-dataproc-ch.tf
:folder_id
— идентификатор облачного каталога, такой же как в настройках провайдера.input-bucket
— имя бакета для входных данных.output-bucket
— имя бакета для выходных данных.dp_ssh_key
— абсолютный путь к публичному ключу для кластера Yandex Data Processing. Подробнее см. в разделе SSH-подключение к хосту Yandex Data Processing.ch_password
— пароль пользователя ClickHouse®.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
Подготовьте тестовые данные
Для примера используются две таблицы в формате CSV:
coords.csv
— содержит информацию о географических координатах автомобиля.sensors.csv
— содержит информацию о скорости и рабочих параметрах автомобиля.
Чтобы подготовить тестовые данные к работе:
-
Скопируйте содержимое файлов из примеров ниже и сохраните их локально в формате CSV:
-
coords.csv
vehicle_id,latitude,longitude,altitude iv9a94th6rztooxh5ur2,55.70329032,37.65472196,427.5 022wsiz48h2ljxuz04x8,56.96149325,38.46541766,423.6 a7fbbqjws4zqw85f6jue,54.99296663,36.79063999,426.2 l7731117m6r6on4m633n,55.34740545,37.13175678,422.5 6f9q6ienc4qfpdwd9nef,56.69752218,38.38871530,428.3
-
sensors.csv
vehicle_id,speed,battery_voltage,cabin_temperature,fuel_level iv9a94th6rztooxh5ur2,0.0,25.5,17,5 022wsiz48h2ljxuz04x8,55.5,54.5,21,22 a7fbbqjws4zqw85f6jue,80.6,22.1,19,73 l7731117m6r6on4m633n,40.9,76.0,25,23 6f9q6ienc4qfpdwd9nef,64.8,90.8,21,32
-
-
Создайте в бакете для входных данных папку
csv
и загрузите в нее созданные CSV-файлы.
Обработайте данные в Yandex Data Processing
Объедините данные из двух таблиц в одну и загрузите ее в формате Parquet в бакет, который вы ранее создали для результатов обработки:
-
Подготовьте файл скрипта:
-
Создайте локально файл с именем
join-tables.py
и скопируйте в него следующий скрипт:join-tables.py
from pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder.appName("JoinExample").getOrCreate() # Чтение таблицы из файла coords.csv coords_df = spark.read.csv("s3a://<имя_входного_бакета>/csv/coords.csv", header=True) # Чтение таблицы из файла sensors.csv sensors_df = spark.read.csv("s3a://<имя_входного_бакета>/csv/sensors.csv", header=True) # Объединение таблицы по столбцу vehicle_id joined_df = coords_df.join(sensors_df, on="vehicle_id", how="inner") # Сохранение объединенной таблицы в бакет в формате Parquet joined_df.write.parquet("s3a://<имя_выходного_бакета>/parquet/")
-
Укажите в скрипте:
- Имя входного бакета, в котором хранятся исходные CSV-таблицы.
- Имя выходного бакета, в который будет сохранен Parquet-файл с объединенными данными.
-
Создайте в бакете для входных данных папку
scripts
и загрузите в нее файлjoin-tables.py
.
-
-
Создайте задание PySpark, указав в поле Main python файл путь к файлу скрипта:
s3a://<имя_входного_бакета>/scripts/join-tables.py
. -
Дождитесь завершения задания и проверьте, что в выходном бакете в папке
parquet
появился Parquet-файлpart-00000-***
.
Примечание
Вы можете просматривать логи выполнения заданий и искать в них информацию с помощью сервиса Yandex Cloud Logging. Подробнее см. в разделе Работа с логами.
Экспортируйте данные в ClickHouse®
Перенесите объединенную таблицу из Object Storage в ClickHouse®:
-
Подготовьте файл скрипта:
-
Создайте локально файл с именем
parquet-to-ch.py
и скопируйте в него следующий скрипт:parquet-to-ch.py
from pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder.appName("ParquetClickhouse").getOrCreate() # Чтение данных из Parquet-файла parquetFile = spark.read.parquet("s3a://<имя_выходного_бакета>/parquet/*.parquet") # Указание порта и параметров кластера ClickHouse® jdbcPort = 8443 jdbcHostname = "c-<идентификатор_кластера>.rw.mdb.yandexcloud.net" jdbcDatabase = "db1" jdbcUrl = f"jdbc:clickhouse://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}?ssl=true" # Перенос таблицы из Parquet-файла в ClickHouse®-таблицу с именем measurements parquetFile.write.format("jdbc") \ .mode("error") \ .option("url", jdbcUrl) \ .option("dbtable", "measurements") \ .option("createTableOptions", "ENGINE = MergeTree() ORDER BY vehicle_id") \ .option("user","user1") \ .option("password","<пароль_пользователя_ClickHouse®>") \ .save()
-
Укажите в скрипте:
- Имя бакета, в котором лежит Parquet-файл.
- Идентификатор кластера Managed Service for ClickHouse®.
- Пароль пользователя ClickHouse®.
-
Загрузите файл
parquet-to-ch.py
в бакет для входных данных в папкуscripts
.
-
-
Создайте задание PySpark, указав в поле Main python файл путь к файлу скрипта:
s3a://<имя_входного_бакета>/scripts/parquet-to-ch.py
. -
Дождитесь выполнения задания и убедитесь, что объединенная таблица перенесена в кластер:
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse® от имени пользователяuser1
. -
Выполните запрос:
SELECT * FROM measurements;
Если экспорт данных прошел успешно, ответом на запрос будет объединенная таблица.
-
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
-
Удалите объекты из бакетов.
-
В терминале перейдите в директорию с планом инфраструктуры.
-
Удалите конфигурационный файл
s3-dataproc-ch.tf
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Все ресурсы, которые были описаны в конфигурационном файле
s3-dataproc-ch.tf
, будут удалены. -
ClickHouse® является зарегистрированным товарным знаком ClickHouse, Inc