Поставка данных из Yandex Managed Service for Apache Kafka® в Yandex Data Streams с помощью Yandex Data Transfer
В поток данных Data Streams можно в реальном времени поставлять данные из топиков Apache Kafka®.
Чтобы запустить поставку данных:
- Создайте поток данных приемника Data Streams.
- Подготовьте и активируйте трансфер.
- Проверьте работоспособность трансфера.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
-
Подготовьте инфраструктуру поставки данных:
ВручнуюTerraform- Создайте кластер-источник Managed Service for Apache Kafka® любой подходящей конфигурации.
- Создайте базу данных Managed Service for YDB любой подходящей конфигурации.
- Создайте в кластере-источнике топик с именем
sensors
. - Создайте в кластере-источнике пользователя с правами доступа
ACCESS_ROLE_PRODUCER
,ACCESS_ROLE_CONSUMER
к созданному топику.
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации data-transfer-mkf-ydb.tf
.В этом файле описаны:
- сеть;
- подсеть;
- группа безопасности и правило, необходимое для подключения к кластеру Managed Service for Apache Kafka®;
- кластер-источник Managed Service for Apache Kafka®;
- топик Apache Kafka®;
- пользователь Apache Kafka®;
- база данных Managed Service for YDB;
- трансфер.
-
Укажите в файле
data-transfer-mkf-ydb.tf
переменные:source_kf_version
— версия Apache Kafka® в кластере-источнике;source_user_name
— имя пользователя для подключения к топику Apache Kafka®;source_user_password
— пароль пользователя;target_db_name
— имя базы данных Managed Service for YDB;transfer_enabled
— значение0
, чтобы не создавать трансфер до создания эндпоинтов вручную.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
В созданный топик Apache Kafka®
sensors
кластера-источника будут поступать тестовые данные от сенсоров автомобиля в формате JSON:{ "device_id":"iv9a94th6rzt********", "datetime":"2020-06-05 17:27:00", "latitude":"55.70329032", "longitude":"37.65472196", "altitude":"427.5", "speed":"0", "battery_voltage":"23.5", "cabin_temperature":"17", "fuel_level":null }
-
Установите утилиты:
-
kafkacat
— для чтения и записи данных в топики Apache Kafka®.sudo apt update && sudo apt install --yes kafkacat
Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.
-
jq
— для потоковой обработки JSON-файлов.sudo apt update && sudo apt-get install --yes jq
-
Создайте поток данных приемника Data Streams
Создайте поток данных приемника Data Streams для базы данных Managed Service for YDB.
Подготовьте и активируйте трансфер
-
Создайте эндпоинт для источника
Apache Kafka®
:Параметры эндпоинта:
-
Тип подключения —
Кластер Managed Service for Apache Kafka
.Выберите кластер-источник из списка и укажите настройки подключения к нему.
-
Расширенные настройки → Правила конвертации.
-
Правила конвертации –
json
. -
Схема данных – Вы можете задать схему двумя способами:
-
Список полей
.Задайте список полей топика вручную:
Имя Тип Ключ device_id
STRING
Да datetime
STRING
latitude
DOUBLE
longitude
DOUBLE
altitude
DOUBLE
speed
DOUBLE
battery_voltage
DOUBLE
cabin_temperature
UINT16
fuel_level
UINT16
-
JSON-спецификация
.Создайте и загрузите файл схемы данных в формате JSON
json_schema.json
:json_schema.json
[ { "name": "device_id", "type": "string", "key": true }, { "name": "datetime", "type": "string" }, { "name": "latitude", "type": "double" }, { "name": "longitude", "type": "double" }, { "name": "altitude", "type": "double" }, { "name": "speed", "type": "double" }, { "name": "battery_voltage", "type": "double" }, { "name": "cabin_temperature", "type": "uint16" }, { "name": "fuel_level", "type": "uint16" } ]
-
-
-
-
Создайте эндпоинт для приемника
Yandex Data Streams
:Настройки подключения:
- База данных — выберите базу данных Managed Service for YDB из списка.
- Поток — укажите имя потока Data Streams.
- Идентификатор сервисного аккаунта — выберите или создайте сервисный аккаунт с ролью
yds.editor
.
-
Создайте трансфер:
ВручнуюTerraform- Создайте трансфер типа Репликация, использующий созданные эндпоинты.
- Активируйте его.
-
Укажите в файле
data-transfer-mkf-ydb.tf
переменные:source_endpoint_id
— значение идентификатора эндпоинта для источника;target_endpoint_id
— значение идентификатора эндпоинта для приемника;transfer_enabled
– значение1
для создания трансфера.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Трансфер активируется автоматически после создания.
-
Проверьте работоспособность трансфера
-
Дождитесь перехода трансфера в статус Реплицируется.
-
Убедитесь, что в поток данных Data Streams переносятся данные из топика кластера-источника Managed Service for Apache Kafka®:
-
Создайте файл
sample.json
с тестовыми данными:{ "device_id": "iv9a94th6rzt********", "datetime": "2020-06-05 17:27:00", "latitude": 55.70329032, "longitude": 37.65472196, "altitude": 427.5, "speed": 0, "battery_voltage": 23.5, "cabin_temperature": 17, "fuel_level": null } { "device_id": "rhibbh3y08qm********", "datetime": "2020-06-06 09:49:54", "latitude": 55.71294467, "longitude": 37.66542005, "altitude": 429.13, "speed": 55.5, "battery_voltage": null, "cabin_temperature": 18, "fuel_level": 32 } { "device_id": "iv9a94th6r********", "datetime": "2020-06-07 15:00:10", "latitude": 55.70985913, "longitude": 37.62141918, "altitude": 417.0, "speed": 15.7, "battery_voltage": 10.3, "cabin_temperature": 17, "fuel_level": null }
-
Отправьте данные из файла
sample.json
в топикsensors
Managed Service for Apache Kafka® с помощью утилитjq
иkafkacat
:jq -rc . sample.json | kafkacat -P \ -b <FQDN_хоста-брокера>:9091 \ -t sensors \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username="<имя_пользователя_в_кластере-источнике>" \ -X sasl.password="<пароль_пользователя_в_кластере-источнике>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
Данные отправляются от имени созданного пользователя. Подробнее о настройке SSL-сертификата и работе с
kafkacat
см. в разделе Подключение к кластеру Apache Kafka® из приложений.
Проверьте, что в поток данных Data Streams перенеслись данные из источника:
Консоль управленияAWS CLI- В консоли управления
выберите сервис Data Streams. - Выберите поток-приемник из списка и перейдите в раздел
Просмотр данных. - Убедитесь, что в сегменте
shard-000000
появились сообщения, содержащие строки таблицы из источника. Чтобы рассмотреть сообщения подробнее, нажмите на значок .
-
Настройте окружение для Data Streams.
-
Прочитайте данные из потока с помощью:
-
Удалите созданные ресурсы
Примечание
Перед тем как удалить созданные ресурсы, деактивируйте трансфер.
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
- Удалите трансфер.
- Удалите эндпоинты для источника и приемника.
- Если при создании эндпоинта для приемника вы создавали сервисный аккаунт, удалите его.
Остальные ресурсы удалите в зависимости от способа их создания:
-
В терминале перейдите в директорию с планом инфраструктуры.
Важно
Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.
-
Удалите ресурсы:
-
Выполните команду:
terraform destroy
-
Подтвердите удаление ресурсов и дождитесь завершения операции.
Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.
-