Поставка данных из очереди Apache Kafka® в YDB
В кластер Managed Service for YDB можно в реальном времени поставлять данные из топиков Apache Kafka®. Эти данные будут автоматически добавлены в таблицы YDB с именами топиков.
Чтобы запустить поставку данных:
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
-
Подготовьте инфраструктуру поставки данных:
ВручнуюTerraform- Создайте кластер-источник Managed Service for Apache Kafka® любой подходящей конфигурации.
- Создайте базу данных Managed Service for YDB любой подходящей конфигурации.
- Создайте в кластере-источнике топик с именем
sensors
. - Создайте в кластере-источнике пользователя с правами доступа
ACCESS_ROLE_PRODUCER
,ACCESS_ROLE_CONSUMER
к созданному топику.
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации data-transfer-mkf-ydb.tf
.В этом файле описаны:
- сеть;
- подсеть;
- группа безопасности и правило, необходимое для подключения к кластеру Managed Service for Apache Kafka®;
- кластер-источник Managed Service for Apache Kafka®;
- топик Apache Kafka®;
- пользователь Apache Kafka®;
- база данных Managed Service for YDB;
- трансфер.
-
Укажите в файле
data-transfer-mkf-ydb.tf
переменные:source_kf_version
– версия Apache Kafka® в кластере-источнике;source_user_name
– имя пользователя для подключения к топику Apache Kafka®;source_user_password
– пароль пользователя;target_db_name
— имя базы данных Managed Service for YDB;transfer_enabled
– значение0
, чтобы не создавать трансфер до создания эндпоинта-приемника вручную.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
В созданный топик Apache Kafka®
sensors
кластера-источника будут поступать тестовые данные от сенсоров автомобиля в формате JSON:{ "device_id":"iv9a94th6rzt********", "datetime":"2020-06-05 17:27:00", "latitude":"55.70329032", "longitude":"37.65472196", "altitude":"427.5", "speed":"0", "battery_voltage":"23.5", "cabin_temperature":"17", "fuel_level":null }
-
Установите утилиты:
-
kafkacat
— для чтения и записи данных в топики Apache Kafka®.sudo apt update && sudo apt install --yes kafkacat
Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.
-
jq
— для потоковой обработки JSON-файлов.sudo apt update && sudo apt-get install --yes jq
-
Подготовьте и активируйте трансфер
-
Создайте эндпоинт для приемника:
-
Тип базы данных —
YDB
. -
Параметры эндпоинта:
- Настройки подключения:
- База данных — выберите базу данных Managed Service for YDB из списка.
- Идентификатор сервисного аккаунта — выберите или создайте сервисный аккаунт с ролью
editor
.
- Настройки подключения:
-
-
Создайте эндпоинт для источника:
- Тип базы данных —
Kafka
. - Параметры эндпоинта:
-
Тип подключения —
Кластер Managed Service for Apache Kafka
.Выберите кластер-источник из списка и укажите настройки подключения к нему.
-
Расширенные настройки → Правила конвертации.
- Формат данных –
JSON
. - Схема данных – Вы можете задать схему двумя способами:
-
Список полей
.Задайте список полей топика вручную:
Имя Тип Ключ device_id
STRING
Да datetime
STRING
latitude
DOUBLE
longitude
DOUBLE
altitude
DOUBLE
speed
DOUBLE
battery_voltage
DOUBLE
cabin_temperature
UINT16
fuel_level
UINT16
-
JSON спецификация
.Создайте и загрузите файл схемы данных в формате JSON
json_schema.json
:json_schema.json
[ { "name": "device_id", "type": "string", "key": true }, { "name": "datetime", "type": "string" }, { "name": "latitude", "type": "double" }, { "name": "longitude", "type": "double" }, { "name": "altitude", "type": "double" }, { "name": "speed", "type": "double" }, { "name": "battery_voltage", "type": "double" }, { "name": "cabin_temperature", "type": "uint16" }, { "name": "fuel_level", "type": "uint16" } ]
-
- Формат данных –
-
- Тип базы данных —
-
Создайте трансфер:
ВручнуюTerraform- Создайте трансфер типа Репликация, использующий созданные эндпоинты.
- Активируйте его.
-
Укажите в файле
data-transfer-mkf-ydb.tf
переменные:source_endpoint_id
— значение идентификатора эндпоинта для источника;target_endpoint_id
— значение идентификатора эндпоинта для приемника;transfer_enabled
– значение1
для создания трансфера.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Трансфер активируется автоматически после создания.
-
Проверьте работоспособность трансфера
-
Дождитесь перехода трансфера в статус Реплицируется.
-
Убедитесь, что в базу данных Managed Service for YDB переносятся данные из топика кластера-источника Managed Service for Apache Kafka®:
-
Создайте файл
sample.json
с тестовыми данными:{ "device_id": "iv9a94th6rzt********", "datetime": "2020-06-05 17:27:00", "latitude": 55.70329032, "longitude": 37.65472196, "altitude": 427.5, "speed": 0, "battery_voltage": 23.5, "cabin_temperature": 17, "fuel_level": null } { "device_id": "rhibbh3y08qm********", "datetime": "2020-06-06 09:49:54", "latitude": 55.71294467, "longitude": 37.66542005, "altitude": 429.13, "speed": 55.5, "battery_voltage": null, "cabin_temperature": 18, "fuel_level": 32 } { "device_id": "iv9a94th6rzt********", "datetime": "2020-06-07 15:00:10", "latitude": 55.70985913, "longitude": 37.62141918, "altitude": 417.0, "speed": 15.7, "battery_voltage": 10.3, "cabin_temperature": 17, "fuel_level": null }
-
Отправьте данные из файла
sample.json
в топикsensors
Managed Service for Apache Kafka® с помощью утилитjq
иkafkacat
:jq -rc . sample.json | kafkacat -P \ -b <FQDN_хоста-брокера>:9091 \ -t sensors \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username="<имя_пользователя_в_кластере-источнике>" \ -X sasl.password="<пароль_пользователя_в_кластере-источнике>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
Данные отправляются от имени созданного пользователя. Подробнее о настройке SSL-сертификата и работе с
kafkacat
см. в разделе Подключение к кластеру Apache Kafka® из приложений. -
Убедитесь, что в базу данных Managed Service for YDB перенеслись данные из кластера-источника Managed Service for Apache Kafka®:
Консоль управленияYDB CLI- В консоли управления
выберите каталог, в котором находится нужная база данных. - В списке сервисов выберите Managed Service for YDB.
- Выберите базу из списка.
- Перейдите на вкладку Навигация.
- Проверьте, что база данных Managed Service for YDB содержит таблицу
sensors
с тестовыми данными из топика.
-
Проверьте, что база данных содержит таблицу
sensors
с тестовыми данными из топика:ydb table query execute \ --query "SELECT * \ FROM sensors"
- В консоли управления
-
Удалите созданные ресурсы
Примечание
Перед тем как удалить созданные ресурсы, деактивируйте трансфер.
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
- Удалите трансфер.
- Удалите эндпоинты для источника и приемника.
- Если при создании эндпоинта для приемника вы создавали сервисный аккаунт, удалите его.
Остальные ресурсы удалите в зависимости от способа их создания:
-
В терминале перейдите в директорию с планом инфраструктуры.
-
Удалите конфигурационный файл
data-transfer-mkf-ydb.tf
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Все ресурсы, которые были описаны в конфигурационном файле
data-transfer-mkf-ydb.tf
, будут удалены. -