Обмен данными с Yandex Data Processing
С помощью Yandex Data Processing вы можете:
- Загрузить данные из Managed Service for ClickHouse® в Spark DataFrame.
- Выгрузить данные из Spark DataFrame в Managed Service for ClickHouse®.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
Подготовьте инфраструктуру:
-
Создайте сервисный аккаунт с именем
dataproc-sa
и назначьте ему ролиdataproc.agent
иdataproc.provisioner
. -
В Object Storage создайте бакеты и настройте доступ к ним:
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
READ
для этого бакета. - Создайте бакет для результатов обработки и предоставьте сервисному аккаунту кластера разрешение
READ и WRITE
для этого бакета.
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
-
Создайте облачную сеть с именем
dataproc-network
. -
В сети
dataproc-network
создайте подсеть в любой зоне доступности. -
Настройте NAT-шлюз для созданной подсети.
-
Если вы используете группы безопасности, создайте группу безопасности с именем
dataproc-sg
в сетиdataproc-network
и добавьте в нее следующие правила:-
По одному правилу для входящего и исходящего служебного трафика:
- Диапазон портов —
0-65535
. - Протокол —
Любой
(Any
). - Источник/Назначение —
Группа безопасности
. - Группа безопасности —
Текущая
(Self
).
- Диапазон портов —
-
Правило для исходящего HTTPS-трафика:
- Диапазон портов —
443
. - Протокол —
TCP
. - Назначение —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
Правило для исходящего трафика по протоколу TCP на порт 8443 для доступа к ClickHouse®:
- Диапазон портов —
8443
. - Протокол —
TCP
. - Назначение —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
-
Создайте кластер Yandex Data Processing с любой подходящей конфигурацией хостов и следующими настройками:
- Компоненты:
- SPARK;
- YARN;
- HDFS.
- Сервисный аккаунт —
dataproc-sa
. - Имя бакета — бакет, который вы создали для выходных данных.
- Сеть —
dataproc-network
. - Группы безопасности —
dataproc-sg
.
- Компоненты:
-
Создайте кластер Managed Service for ClickHouse® любой подходящей конфигурации со следующими настройками:
- С публичным доступом к хостам кластера.
- С базой данных
db1
. - С пользователем
user1
.
-
Если вы используете группы безопасности в кластере Managed Service for ClickHouse®, убедитесь, что они настроены правильно и допускают подключение к нему.
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации data-proc-data-exchange-with-mch.tf
.В этом файле описаны:
- сеть;
- подсеть;
- NAT-шлюз и таблица маршрутизации, необходимые для работы Yandex Data Processing;
- группы безопасности, необходимые для кластеров Yandex Data Processing и Managed Service for ClickHouse®;
- сервисный аккаунт, необходимый для работы кластера Yandex Data Processing;
- сервисный аккаунт, необходимый для создания бакетов в Object Storage;
- бакеты для входных и выходных данных;
- кластер Yandex Data Processing;
- кластер Managed Service for ClickHouse®.
-
Укажите в файле
data-proc-data-exchange-with-mch.tf
:folder_id
— идентификатор облачного каталога, такой же как в настройках провайдера.input_bucket
— имя бакета для входных данных.output_bucket
— имя бакета для выходных данных.dp_ssh_key
— абсолютный путь к публичному ключу для кластера Yandex Data Processing. Подробнее см. в разделе SSH-подключение к хосту Yandex Data Processing.ch_password
— пароль пользователя ClickHouse®.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
Загрузите данные из Managed Service for ClickHouse®
Подготовьте таблицу в кластере Managed Service for ClickHouse®
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse® от имени пользователяuser1
. -
Наполните базу тестовыми данными. В качестве примера используется простая таблица с именами и возрастом людей.
-
Создайте таблицу:
CREATE TABLE persons ( `name` String, `age` UInt8) ENGINE = MergeTree () ORDER BY `name`;
-
Наполните таблицу данными:
INSERT INTO persons VALUES ('Anna', 19), ('Michael', 65), ('Alvar', 28), ('Lilith', 50), ('Max', 27), ('Jaimey', 34), ('Dmitry', 42), ('Qiang', 19), ('Augustyna', 20), ('Maria', 28);
-
Проверьте результат:
SELECT * FROM persons;
-
Перенесите таблицу из Managed Service for ClickHouse®
-
Подготовьте файл скрипта:
-
Создайте локальный файл с именем
ch-to-dataproc.py
и скопируйте в него следующий скрипт:ch-to-dataproc.py
from pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder.appName("ClickhouseDataproc").getOrCreate() # Указание порта и параметров кластера ClickHouse® jdbcPort = 8443 jdbcHostname = "c-<идентификатор_кластера_ClickHouse®>.rw.mdb.yandexcloud.net" jdbcDatabase = "db1" jdbcUrl = f"jdbc:clickhouse://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}?ssl=true" # Перенос таблицы persons из ClickHouse® в DataFrame df = spark.read.format("jdbc") \ .option("url", jdbcUrl) \ .option("user","user1") \ .option("password","<пароль_пользователя_user1>") \ .option("dbtable","persons") \ .load() # Перенос DataFrame в бакет для проверки df.repartition(1).write.mode("overwrite") \ .csv(path='s3a://<имя_выходного_бакета>/csv', header=True, sep=',')
-
Укажите в скрипте:
- Идентификатор кластера Managed Service for ClickHouse®.
- Пароль пользователя
user1
. - Имя выходного бакета.
-
Создайте в бакете для входных данных папку
scripts
и загрузите в нее файлch-to-dataproc.py
.
-
-
Создайте задание PySpark, указав в поле Main python файл путь к файлу скрипта:
s3a://<имя_входного_бакета>/scripts/ch-to-dataproc.py
. -
Дождитесь завершения задания и проверьте, что в папке
csv
выходного бакета появилась исходная таблица.
Примечание
Вы можете просматривать логи выполнения заданий и искать в них информацию с помощью сервиса Yandex Cloud Logging. Подробнее см. в разделе Работа с логами.
Выгрузите данные в Managed Service for ClickHouse®
-
Подготовьте файл скрипта:
-
Создайте локальный файл с именем
dataproc-to-ch.py
и скопируйте в него следующий скрипт:dataproc-to-ch.py
from pyspark.sql import SparkSession from pyspark.sql.types import * # Создание Spark-сессии spark = SparkSession.builder.appName("DataprocClickhouse").getOrCreate() # Создание схемы данных schema = StructType([StructField('name', StringType(), True), StructField('age', IntegerType(), True)]) # Создание DataFrame df = spark.createDataFrame([('Alim', 19), ('Fred' ,65), ('Guanmin' , 28), ('Till', 60), ('Almagul', 27), ('Mary', 34), ('Dmitry', 42)], schema) # Указание порта и параметров кластера ClickHouse® jdbcPort = 8443 jdbcHostname = "c-<идентификатор_кластера_ClickHouse®>.rw.mdb.yandexcloud.net" jdbcDatabase = "db1" jdbcUrl = f"jdbc:clickhouse://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}?ssl=true" # Перенос DataFrame в ClickHouse® df.write.format("jdbc") \ .mode("error") \ .option("url", jdbcUrl) \ .option("dbtable", "people") \ .option("createTableOptions", "ENGINE = MergeTree() ORDER BY age") \ .option("user","user1") \ .option("password","<пароль_к_базе_данных_ClickHouse®>") \ .save()
-
Укажите в скрипте:
- Идентификатор кластера Managed Service for ClickHouse®.
- Пароль пользователя
user1
.
-
Создайте в бакете для входных данных папку
scripts
и загрузите в нее файлdataproc-to-ch.py
.
-
-
Создайте задание PySpark, указав в поле Main python файл путь к файлу скрипта:
s3a://<имя_входного_бакета>/scripts/dataproc-to-ch.py
. -
Дождитесь завершения задания и проверьте, что данные перенеслись в Managed Service for ClickHouse®:
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse® от имени пользователяuser1
. -
Выполните запрос:
SELECT * FROM people;
Если выгрузка прошла успешно, ответ на запрос будет содержать таблицу с данными.
-
Примечание
Вы можете просматривать логи выполнения заданий и искать в них информацию с помощью сервиса Yandex Cloud Logging. Подробнее см. в разделе Работа с логами.
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
-
Удалите объекты из бакетов.
-
В терминале перейдите в директорию с планом инфраструктуры.
-
Удалите конфигурационный файл
data-proc-data-exchange-with-mch.tf
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Все ресурсы, которые были описаны в конфигурационном файле
data-proc-data-exchange-with-mch.tf
, будут удалены. -
ClickHouse® является зарегистрированным товарным знаком ClickHouse, Inc