Импорт данных из Object Storage, обработка и экспорт в Yandex Managed Service for ClickHouse®
|
Руководство основано на сценарии компании Data Stories |
|
В качестве примера используются две CSV-таблицы, которые нужно объединить в одну, импортировать в формат Parquet и передать в Managed Service for ClickHouse®.
Необходимые платные ресурсы
В стоимость поддержки описываемого решения входят:
- Плата за кластер Managed Service for ClickHouse®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Managed Service for ClickHouse®).
- Плата за кластер Yandex Data Processing: использование вычислительных ресурсов ВМ и сетевых дисков Compute Cloud, а также сервиса Cloud Logging для работы с логами (см. тарифы Yandex Data Processing).
- Плата за использование публичных IP-адресов для хостов кластера (см. тарифы Virtual Private Cloud).
- Плата за бакеты Object Storage: хранение данных и выполнение операций с ними (см. тарифы Object Storage).
- Плата за NAT-шлюз (см. тарифы Virtual Private Cloud).
Перед началом работы
Подготовьте инфраструктуру:
-
Создайте сервисный аккаунт с именем
dataproc-s3-saи назначьте ему ролиdataproc.agentиdataproc.provisioner. -
В Object Storage создайте бакеты и настройте доступ к ним:
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
READдля этого бакета. - Создайте бакет для результатов обработки и предоставьте сервисному аккаунту кластера разрешение
READ и WRITEдля этого бакета.
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
-
Создайте облачную сеть с именем
dataproc-network. -
В сети
dataproc-networkсоздайте подсеть в любой зоне доступности. -
Настройте NAT-шлюз для созданной подсети.
-
В сети
dataproc-networkсоздайте группу безопасности с именемdataproc-sgи добавьте в нее следующие правила:-
По одному правилу для входящего и исходящего служебного трафика:
- Диапазон портов —
0-65535. - Протокол —
Любой(Any). - Источник / Назначение —
Группа безопасности. - Группа безопасности —
Текущая(Self).
- Диапазон портов —
-
Правило для исходящего HTTPS-трафика:
- Диапазон портов —
443. - Протокол —
TCP. - Назначение —
CIDR. - CIDR блоки —
0.0.0.0/0.
- Диапазон портов —
-
Правило для исходящего трафика по протоколу TCP на порт 8443 для доступа к ClickHouse®:
- Диапазон портов —
8443. - Протокол —
TCP. - Назначение —
CIDR. - CIDR блоки —
0.0.0.0/0.
- Диапазон портов —
-
-
Создайте кластер Yandex Data Processing с любой подходящей конфигурацией хостов и следующими настройками:
- Окружение —
PRODUCTION. - Сервисы:
SPARK;YARN;HDFS.
- Сервисный аккаунт —
dataproc-sa. - Имя бакета — бакет, который вы создали для выходных данных.
- Сеть —
dataproc-network. - Группы безопасности —
dataproc-sg. - Настройка UI Proxy включена.
- Окружение —
-
Создайте кластер Managed Service for ClickHouse® любой подходящей конфигурации со следующими настройками:
- С публичным доступом к хостам кластера.
- Имя БД —
db1. - Имя пользователя —
user1.
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации s3-dataproc-ch.tf
.В этом файле описаны:
- сеть;
- подсеть;
- NAT-шлюз и таблица маршрутизации, необходимые для работы Yandex Data Processing;
- группы безопасности, необходимые для кластеров Yandex Data Processing и Managed Service for ClickHouse®;
- сервисный аккаунт, необходимый для работы кластера Yandex Data Processing;
- сервисный аккаунт, необходимый для создания бакетов в Object Storage;
- бакеты для входных и выходных данных;
- кластер Yandex Data Processing;
- кластер Managed Service for ClickHouse®.
-
Укажите в файле
s3-dataproc-ch.tf:folder_id— идентификатор облачного каталога, такой же как в настройках провайдера.input-bucket— имя бакета для входных данных.output-bucket— имя бакета для выходных данных.dp_ssh_key— абсолютный путь к публичному ключу для кластера Yandex Data Processing. Подробнее о подключении к хосту Yandex Data Processing по SSH.ch_password— пароль пользователя ClickHouse®.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validateЕсли в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform planЕсли конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply -
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
Подготовьте тестовые данные
Для примера используются две таблицы в формате CSV:
coords.csv— содержит информацию о географических координатах автомобиля.sensors.csv— содержит информацию о скорости и рабочих параметрах автомобиля.
Чтобы подготовить тестовые данные к работе:
-
Скопируйте содержимое файлов из примеров ниже и сохраните их локально в формате CSV:
-
coords.csv
vehicle_id,latitude,longitude,altitude iv9a94th6rztooxh5ur2,55.70329032,37.65472196,427.5 022wsiz48h2ljxuz04x8,56.96149325,38.46541766,423.6 a7fbbqjws4zqw85f6jue,54.99296663,36.79063999,426.2 l7731117m6r6on4m633n,55.34740545,37.13175678,422.5 6f9q6ienc4qfpdwd9nef,56.69752218,38.38871530,428.3 -
sensors.csv
vehicle_id,speed,battery_voltage,cabin_temperature,fuel_level iv9a94th6rztooxh5ur2,0.0,25.5,17,5 022wsiz48h2ljxuz04x8,55.5,54.5,21,22 a7fbbqjws4zqw85f6jue,80.6,22.1,19,73 l7731117m6r6on4m633n,40.9,76.0,25,23 6f9q6ienc4qfpdwd9nef,64.8,90.8,21,32
-
-
Создайте в бакете для входных данных папку
csvи загрузите в нее созданные CSV-файлы.
Обработайте данные в Yandex Data Processing
Объедините данные из двух таблиц в одну и загрузите ее в формате Parquet в бакет, который вы ранее создали для результатов обработки:
-
Подготовьте файл скрипта:
-
Создайте локально файл с именем
join-tables.pyи скопируйте в него следующий скрипт:join-tables.py
from pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder.appName("JoinExample").getOrCreate() # Чтение таблицы из файла coords.csv coords_df = spark.read.csv("s3a://<имя_входного_бакета>/csv/coords.csv", header=True) # Чтение таблицы из файла sensors.csv sensors_df = spark.read.csv("s3a://<имя_входного_бакета>/csv/sensors.csv", header=True) # Объединение таблицы по столбцу vehicle_id joined_df = coords_df.join(sensors_df, on="vehicle_id", how="inner") # Сохранение объединенной таблицы в бакет в формате Parquet joined_df.write.parquet("s3a://<имя_выходного_бакета>/parquet/") -
Укажите в скрипте:
- Имя входного бакета, в котором хранятся исходные CSV-таблицы.
- Имя выходного бакета, в который будет сохранен Parquet-файл с объединенными данными.
-
Создайте в бакете для входных данных папку
scriptsи загрузите в нее файлjoin-tables.py.
-
-
Создайте задание PySpark, указав в поле Main python файл путь к файлу скрипта:
s3a://<имя_входного_бакета>/scripts/join-tables.py. -
Дождитесь завершения задания и проверьте, что в выходном бакете в папке
parquetпоявился Parquet-файлpart-00000-***.
Примечание
Вы можете просматривать логи выполнения заданий и искать в них информацию с помощью сервиса Yandex Cloud Logging. Подробнее см. в разделе Работа с логами.
Экспортируйте данные в ClickHouse®
Перенесите объединенную таблицу из Object Storage в ClickHouse®:
-
Подготовьте файл скрипта:
-
Создайте локально файл с именем
parquet-to-ch.pyи скопируйте в него следующий скрипт:parquet-to-ch.py
from pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder.appName("ParquetClickhouse").getOrCreate() # Чтение данных из Parquet-файла parquetFile = spark.read.parquet("s3a://<имя_выходного_бакета>/parquet/*.parquet") # Указание порта и параметров кластера ClickHouse® jdbcPort = 8443 jdbcHostname = "c-<идентификатор_кластера>.rw.mdb.yandexcloud.net" jdbcDatabase = "db1" jdbcUrl = f"jdbc:clickhouse://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}?ssl=true" # Перенос таблицы из Parquet-файла в ClickHouse®-таблицу с именем measurements parquetFile.write.format("jdbc") \ .mode("error") \ .option("url", jdbcUrl) \ .option("dbtable", "measurements") \ .option("createTableOptions", "ENGINE = MergeTree() ORDER BY vehicle_id") \ .option("user","user1") \ .option("password","<пароль_пользователя_ClickHouse®>") \ .save() -
Укажите в скрипте:
- Имя бакета, в котором лежит Parquet-файл.
- Идентификатор кластера Managed Service for ClickHouse®.
- Пароль пользователя ClickHouse®.
-
Загрузите файл
parquet-to-ch.pyв бакет для входных данных в папкуscripts.
-
-
Создайте задание PySpark, указав в поле Main python файл путь к файлу скрипта:
s3a://<имя_входного_бакета>/scripts/parquet-to-ch.py. -
Дождитесь выполнения задания и убедитесь, что объединенная таблица перенесена в кластер:
-
Подключитесь к базе данных
db1кластера Managed Service for ClickHouse® от имени пользователяuser1. -
Выполните запрос:
SELECT * FROM measurements;
Если экспорт данных прошел успешно, ответом на запрос будет объединенная таблица.
-
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
-
Удалите объекты из бакетов.
-
Остальные ресурсы удалите в зависимости от способа их создания:
ВручнуюTerraform-
В терминале перейдите в директорию с планом инфраструктуры.
Важно
Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.
-
Удалите ресурсы:
-
Выполните команду:
terraform destroy -
Подтвердите удаление ресурсов и дождитесь завершения операции.
Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.
-
-
ClickHouse® является зарегистрированным товарным знаком ClickHouse, Inc
