Поставка данных из очереди Apache Kafka® в ClickHouse®
В кластер Managed Service for ClickHouse® можно в реальном времени поставлять данные из топиков Apache Kafka®. Эти данные будут автоматически вставлены в таблицы ClickHouse® на движке Kafka
Чтобы настроить поставку данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®:
- Отправьте тестовые данные в топик Managed Service for Apache Kafka®.
- Подготовьте и активируйте трансфер.
- Проверьте работоспособность трансфера.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
Подготовьте инфраструктуру
-
Создайте кластер-источник Managed Service for Apache Kafka® любой подходящей конфигурации. Для подключения к кластеру с локальной машины пользователя, а не из облачной сети Yandex Cloud, включите публичный доступ к кластеру при его создании.
-
Создайте топик в кластере Managed Service for Apache Kafka®.
-
Чтобы производитель и потребитель могли работать с топиком в кластере Managed Service for Apache Kafka®, создайте пользователей:
- с ролью
ACCESS_ROLE_PRODUCER
для производителя; - с ролью
ACCESS_ROLE_CONSUMER
для потребителя.
- с ролью
-
Создайте кластер-приемник Managed Service for ClickHouse® любой подходящей конфигурации. Для подключения к кластеру с локальной машины пользователя, а не из облачной сети Yandex Cloud, включите публичный доступ к кластеру при его создании.
-
Если вы используете группы безопасности, настройте их так, чтобы к кластерам можно было подключаться из интернета:
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации data-transfer-mkf-mch.tf
.В этом файле описаны:
- сеть;
- подсеть;
- группа безопасности и правила, необходимые для подключения к кластерам из интернета;
- кластер-источник Managed Service for Apache Kafka®;
- топик и два пользователя Apache Kafka®, от имени которых к топику будут подключаться производитель и потребитель;
- кластер-приемник Managed Service for ClickHouse®;
- эндпоинт для приемника;
- трансфер.
-
Укажите в файле
data-transfer-mkf-mch.tf
:-
параметры кластера-источника Managed Service for Apache Kafka®:
source_user_producer
иsource_password_producer
— имя и пароль пользователя производителя;source_user_consumer
иsource_password_consumer
— имя и пароль пользователя потребителя;source_topic_name
— имя топика;
-
параметры кластера-приемника Managed Service for ClickHouse®, которые будут использоваться как параметры эндпоинта-приемника:
target_db_name
— имя базы данных Managed Service for ClickHouse®;target_user
иtarget_password
— имя и пароль пользователя-владельца базы данных.
-
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
Выполните дополнительные настройки
-
Установите утилиты:
-
kafkacat
— для чтения и записи данных в топик Apache Kafka®.sudo apt update && sudo apt install --yes kafkacat
Убедитесь, что можете с ее помощью подключиться к кластерам Managed Service for Apache Kafka® через SSL.
-
clickhouse-client
— для подключения к базе данных в кластере Managed Service for ClickHouse®.-
Подключите DEB-репозиторий
ClickHouse®:sudo apt update && sudo apt install --yes apt-transport-https ca-certificates dirmngr && \ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4 && \ echo "deb https://packages.clickhouse.com/deb stable main" | sudo tee \ /etc/apt/sources.list.d/clickhouse.list
-
Установите зависимости:
sudo apt update && sudo apt install --yes clickhouse-client
-
Загрузите файл конфигурации для
clickhouse-client
:mkdir -p ~/.clickhouse-client && \ wget "https://storage.yandexcloud.net/doc-files/clickhouse-client.conf.example" \ --output-document ~/.clickhouse-client/config.xml
Убедитесь, что можете с ее помощью подключиться к кластеру Managed Service for ClickHouse® через SSL.
-
-
jq
— для потоковой обработки JSON-файлов.sudo apt update && sudo apt-get install --yes jq
-
Отправьте тестовые данные в топик Managed Service for Apache Kafka®
Пусть в топик Apache Kafka® поступают данные от сенсоров автомобиля. Эти данные будут передаваться в виде сообщений Apache Kafka® в формате JSON:
{
"device_id":"iv9a94th6rzt********",
"datetime":"2020-06-05 17:27:00",
"latitude":"55.70329032",
"longitude":"37.65472196",
"altitude":"427.5",
"speed":"0",
"battery_voltage":"23.5",
"cabin_temperature":"17",
"fuel_level":null
}
Кластер Managed Service for ClickHouse® будет использовать формат данных JSONEachRowKafka
. Этот формат преобразует строки из сообщения Apache Kafka® в нужные значения столбцов.
-
Создайте файл
sample.json
с тестовыми данными:sample.json
{ "device_id": "iv9a94th6rzt********", "datetime": "2020-06-05 17:27:00", "latitude": 55.70329032, "longitude": 37.65472196, "altitude": 427.5, "speed": 0, "battery_voltage": 23.5, "cabin_temperature": 17, "fuel_level": null } { "device_id": "rhibbh3y08qm********", "datetime": "2020-06-06 09:49:54", "latitude": 55.71294467, "longitude": 37.66542005, "altitude": 429.13, "speed": 55.5, "battery_voltage": null, "cabin_temperature": 18, "fuel_level": 32 } { "device_id": "iv9a94th6rzt********", "datetime": "2020-06-07 15:00:10", "latitude": 55.70985913, "longitude": 37.62141918, "altitude": 417.0, "speed": 15.7, "battery_voltage": 10.3, "cabin_temperature": 17, "fuel_level": null }
-
Отправьте данные из файла
sample.json
в топик Managed Service for Apache Kafka® с помощьюjq
иkafkacat
:jq -rc . sample.json | kafkacat -P \ -b <FQDN_хоста-брокера>:9091 \ -t <имя_топика> \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username="<имя_пользователя_для_производителя>" \ -X sasl.password="<пароль_пользователя_для_производителя>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/RootCA.crt -Z
Подготовьте и активируйте трансфер
Примечание
Для ускорения поставки большого объема данных используйте специальные настройки эндпоинтов.
-
Создайте эндпоинт для источника:
-
Тип базы данных —
Kafka
. -
Параметры эндпоинта → Настройки подключения:
-
Тип подключения —
Кластер Managed Service for Apache Kafka
.-
Кластер Managed Service for Apache Kafka — выберите кластер-источник из списка.
-
Аутентификация:
- Имя пользователя — укажите имя пользователя потребителя.
- Пароль — укажите пароль пользователя потребителя.
-
-
Полное имя топика — укажите имя топика в кластере Managed Service for Apache Kafka®.
-
(Опционально) Расширенные настройки → Правила конвертации:
-
Формат данных —
JSON
. -
Схема данных —
JSON-спецификация
:Создайте и загрузите файл схемы данных в формате JSON
json_schema.json
:json_schema.json
[ { "name": "device_id", "type": "string" }, { "name": "datetime", "type": "datetime" }, { "name": "latitude", "type": "double" }, { "name": "longitude", "type": "double" }, { "name": "altitude", "type": "double" }, { "name": "speed", "type": "double" }, { "name": "battery_voltage", "type": "any" }, { "name": "cabin_temperature", "type": "double" }, { "name": "fuel_level", "type": "any" } ]
-
-
-
-
Создайте эндпоинт для приемника и трансфер:
ВручнуюTerraform-
Создайте эндпоинт для приемника:
-
Тип базы данных —
ClickHouse
. -
Параметры эндпоинта:
-
Настройки подключения:
-
Тип подключения —
Managed кластер
.- Managed кластер — выберите кластер-приемник из списка.
-
База данных — укажите имя базы данных.
-
Пользователь и Пароль — укажите имя и пароль пользователя с доступом к базе, например, владельца базы данных.
-
-
Расширенные настройки → Загружать данные в формате JSON — включите эту опцию, если в расширенных настройках эндпоинта-источника включили опцию Правила конвертации.
-
-
-
Создайте трансфер типа Репликация, использующий созданные эндпоинты.
-
Активируйте его.
-
Раскомментируйте в файле
data-transfer-mkf-mch.tf
:- переменную
source_endpoint_id
и задайте ей значение идентификатора эндпоинта для источника, созданного на предыдущем шаге; - ресурсы
yandex_datatransfer_endpoint
иyandex_datatransfer_transfer
.
- переменную
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Трансфер активируется автоматически после создания.
-
-
Проверьте работоспособность трансфера
-
Дождитесь перехода трансфера в статус Реплицируется.
-
Убедитесь, что в базу данных Managed Service for ClickHouse® перенеслись данные из кластера-источника Managed Service for Apache Kafka®:
-
Подключитесь к кластеру с помощью
clickhouse-client
. -
Выполните запрос:
SELECT * FROM <имя_базы_данных_ClickHouse®>.<имя_топика_Apache_Kafka>
-
-
Измените значения в файле
sample.json
и отправьте данные из него в топик Managed Service for Apache Kafka®:jq -rc . sample.json | kafkacat -P \ -b <FQDN_хоста-брокера>:9091 \ -t <имя_топика> \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username="<имя_пользователя_для_производителя>" \ -X sasl.password="<пароль_пользователя_для_производителя>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/RootCA.crt -Z
-
Убедитесь, что в базе данных Managed Service for ClickHouse® отобразились новые значения:
-
Подключитесь к кластеру с помощью
clickhouse-client
. -
Выполните запрос:
SELECT * FROM <имя_базы_данных_ClickHouse®>.<имя_топика_Apache_Kafka>
-
Удалите созданные ресурсы
Примечание
Перед тем как удалить созданные ресурсы, деактивируйте трансфер.
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
- Удалите трансфер.
- Удалите эндпоинт для источника.
Остальные ресурсы удалите в зависимости от способа их создания:
-
В терминале перейдите в директорию с планом инфраструктуры.
-
Удалите конфигурационный файл
data-transfer-mkf-mch.tf
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Все ресурсы, которые были описаны в конфигурационном файле
data-transfer-mkf-mch.tf
, будут удалены. -
ClickHouse® является зарегистрированным товарным знаком ClickHouse, Inc