Захват изменений из YDB и поставка в Apache Kafka®
Важно
Этот документ не применим для пользователей Yandex Cloud в регионе Казахстан. См. полный перечень поддерживаемых эндпоинтов в Data Transfer.
Вы можете отслеживать изменения данных в источнике Managed Service for YDB и отправлять их в кластер-приемник Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC). Эти данные будут автоматически добавлены в топики Managed Service for Apache Kafka® с именами таблиц Managed Service for YDB.
Примечание
В YDB CDC-режим поддерживается, начиная с версии 22.5 и выше.
Чтобы запустить поставку данных:
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
-
Подготовьте инфраструктуру поставки данных:
ВручнуюTerraform-
Создайте базу данных Managed Service for YDB любой подходящей конфигурации.
-
Если вы выбрали режим БД Dedicated, создайте и настройте группу безопасности в сети, где находится БД.
-
Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.
-
Если вы используете группы безопасности, настройте их так, чтобы к кластеру можно было подключаться из интернета.
-
Настройте в кластере-приемнике топики Apache Kafka®. Настройки различаются в зависимости от используемого способа управления топиками. Имена топиков для данных формируются как
<префикс_топика>.<имя_таблицы_YDB>
. В этом руководстве в качестве примера используется префиксcdc
.-
Если выбрано управление топиками через стандартные интерфейсы Yandex Cloud (Консоль управления, CLI, API):
-
Создайте топик с именем
cdc.sensors
.Для отслеживания изменений в нескольких таблицах создайте для каждой из них отдельный топик с префиксом
cdc
. -
Создайте пользователя с ролями
ACCESS_ROLE_CONSUMER
иACCESS_ROLE_PRODUCER
для топикаcdc.sensors
. Чтобы включить все созданные топики, укажите в имени топикаcdc.*
.
-
-
Если для управления топиками используется Kafka Admin API:
-
Создайте пользователя-администратора.
-
Помимо роли
ACCESS_ROLE_ADMIN
назначьте пользователю-администратору ролиACCESS_ROLE_CONSUMER
иACCESS_ROLE_PRODUCER
для топиковcdc.*
, имена которых начинаются с префиксаcdc
.Необходимые топики будут созданы автоматически при первом изменении в отслеживаемых таблицах кластера-источника. Такое решение может быть удобным для отслеживания изменений во множестве таблиц, однако, требует запаса свободного места в хранилище кластера. Подробнее см. в разделе Хранилище в Managed Service for Apache Kafka®.
-
-
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации data-transfer-ydb-mkf.tf
.В этом файле описаны:
- сеть;
- подсеть;
- группа безопасности и правило, необходимое для подключения к кластеру Managed Service for Apache Kafka®;
- база данных Managed Service for YDB;
- кластер-приемник Managed Service for Apache Kafka®;
- топик Apache Kafka®;
- пользователь Apache Kafka®;
- трансфер.
Выбор способа управления топиками определяется переменной Terraform
kf_topics_management
. Переменная задается при выполнении командterraform plan
иterraform apply
(см. далее):-
Если управление топиками осуществляется с помощью стандартных интерфейсов Yandex Cloud (Консоль управления, CLI, API):
- Для отслеживания изменений в нескольких таблицах добавьте в файл конфигурации для каждой из них описание отдельного топика с префиксом
cdc
. - Задайте для переменной Terraform
kf_topics_management
значениеfalse
.
- Для отслеживания изменений в нескольких таблицах добавьте в файл конфигурации для каждой из них описание отдельного топика с префиксом
-
Если для управления топиками используется Kafka Admin API, задайте для переменной Terraform
kf_topics_management
значениеtrue
.
-
Укажите в файле
data-transfer-ydb-mkf.tf
переменные:source_db_name
— имя базы данных Managed Service for YDB;target_kf_version
– версия Apache Kafka® в кластере-приемнике;target_user_name
– имя пользователя для подключения к топику Apache Kafka®;target_user_password
– пароль пользователя;transfer_enabled
– значение0
, чтобы не создавать трансфер до создания эндпоинтов вручную.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
-
-
Установите утилиту kafkacat
для чтения и записи данных в топики Apache Kafka®.sudo apt update && sudo apt install --yes kafkacat
Убедитесь, что можете с ее помощью подключиться к кластеру-приемнику Managed Service for Apache Kafka® через SSL.
Подготовьте источник
-
Создайте YDB-таблицу. В качестве примера используется таблица
sensors
с информацией, поступающей от условных датчиков автомобиля.Добавьте в таблицу колонки вручную:
Имя Тип Первичный ключ device_id
String
Да datetime
String
latitude
Double
longitude
Double
altitude
Double
speed
Double
battery_voltage
Double
cabin_temperature
Uint8
fuel_level
Uint32
Остальные настройки оставьте по умолчанию.
Также создать таблицу можно YQL-командой:
CREATE TABLE sensors ( device_id String, datetime String, latitude Double, longitude Double, altitude Double, speed Double, battery_voltage Double, cabin_temperature Uint8, fuel_level Uint32, PRIMARY KEY (device_id) )
Подготовьте и активируйте трансфер
-
Создайте эндпоинт для источника:
-
Тип базы данных —
YDB
. -
Параметры эндпоинта:
-
Настройки подключения:
- База данных — выберите базу данных Managed Service for YDB из списка.
- Идентификатор сервисного аккаунта — выберите или создайте сервисный аккаунт с ролью
editor
.
-
Список включенных путей — укажите имена таблиц и директорий базы данных Managed Service for YDB для переноса.
Важно
Реплицируются только указанные таблицы и директории. Если не указать имен, то никакие таблицы не будут перенесены.
-
-
-
Создайте эндпоинт для приемника:
-
Тип базы данных —
Kafka
. -
Параметры эндпоинта:
-
Тип подключения —
Кластер Managed Service for Apache Kafka
.- Кластер Managed Service for Apache Kafka — выберите созданный ранее кластер-источник Managed Service for Apache Kafka®.
- Аутентификация — укажите данные созданного ранее пользователя Apache Kafka®.
-
Топик —
Полное имя топика
. -
Полное имя топика —
cdc.sensors
.
Если необходимо отслеживать изменения в нескольких таблицах, заполните поля следующим образом:
- Топик —
Префикс топика
. - Префикс топика — укажите префикс
cdc
, использованный при формировании имен топиков.
-
-
-
Создайте трансфер:
ВручнуюTerraform- Создайте трансфер типа Репликация, использующий созданные эндпоинты.
- Активируйте его.
-
Укажите в файле
data-transfer-ydb-mkf.tf
переменные:source_endpoint_id
— значение идентификатора эндпоинта для источника;target_endpoint_id
— значение идентификатора эндпоинта для приемника;transfer_enabled
– значение1
для создания трансфера.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Трансфер активируется автоматически после создания.
-
Проверьте работоспособность трансфера
-
Дождитесь перехода трансфера в статус Реплицируется.
-
В отдельном терминале запустите утилиту
kafkacat
в режиме потребителя:kafkacat \ -C \ -b <FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 \ -t cdc.sensors \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=kafka-user \ -X sasl.password=<пароль> \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -Z \ -K:
FQDN хостов-брокеров можно получить со списком хостов в кластере Managed Service for Apache Kafka®.
-
Подключитесь к базе данных Managed Service for YDB и добавьте тестовые данные в таблицу
sensors
:REPLACE INTO sensors (device_id, datetime, latitude, longitude, altitude, speed, battery_voltage, cabin_temperature, fuel_level) VALUES ('iv9a94th6rzt********', '2022-06-05 17:27:00', 55.70329032, 37.65472196, 427.5, 0, 23.5, 17, NULL), ('rhibbh3y08qm********', '2022-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32), ('iv9a94th6rzt********', '2022-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, NULL, 20.5, 15, 20);
-
Убедитесь, что в терминале с запущенной утилитой
kafkacat
отобразились схема формата данных таблицыsensors
и сведения о добавленных строках.Пример фрагмента сообщения
{ "payload": { "device_id": "aXY5YTk0dGg2cnp0b294********" }, "schema": { "fields": [ { "field": "device_id", "optional": false, "type": "bytes" } ], "name": "cdc..sensors.Key", "optional": false, "type": "struct" } }: { "payload": { "after": { "altitude": 378, "battery_voltage": 20.5, "cabin_temperature": 15, "datetime": "MjAyMi0wNi0wOCAxNzo0********", "device_id": "aXY5YTk0dGg2cnp0b294********", "fuel_level": 20, "latitude": 53.70987913, "longitude": 36.62549834, "speed": null }, "before": null, "op": "c", "source": { "db": "", "name": "cdc", "snapshot": "false", "table": "sensors", "ts_ms": 1678642104797, "version": "1.1.2.Final" }, "transaction": null, "ts_ms": 1678642104797 }, "schema": { "fields": [ { "field": "before", "fields": [ { "field": "device_id", "optional": false, "type": "bytes" }, ... ], "name": "cdc..sensors.Value", "optional": true, "type": "struct" }, { "field": "after", "fields": [ { "field": "device_id", "optional": false, "type": "bytes" }, ... ], "name": "cdc..sensors.Value", "optional": true, "type": "struct" }, { "field": "source", "fields": [ { "field": "version", "optional": false, "type": "string" }, { "field": "connector", "optional": false, "type": "string" }, { "field": "name", "optional": false, "type": "string" }, { "field": "ts_ms", "optional": false, "type": "int64" }, { "default": "false", "field": "snapshot", "name": "io.debezium.data.Enum", "optional": true, "parameters": { "allowed": "true,last,false" }, "type": "string", "version": 1 }, { "field": "db", "optional": false, "type": "string" }, { "field": "table", "optional": false, "type": "string" } ], "optional": false, "type": "struct" }, ..., { "field": "transaction", "fields": [ { "field": "id", "optional": false, "type": "string" }, { "field": "total_order", "optional": false, "type": "int64" }, { "field": "data_collection_order", "optional": false, "type": "int64" } ], "optional": true, "type": "struct" } ], "name": "cdc..sensors.Envelope", "optional": false, "type": "struct" } }
Удалите созданные ресурсы
Примечание
Перед тем как удалить созданные ресурсы, деактивируйте трансфер.
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
- Удалите трансфер.
- Удалите эндпоинты для источника и приемника.
- Если при создании эндпоинта для источника вы создавали сервисный аккаунт, удалите его.
Остальные ресурсы удалите в зависимости от способа их создания:
-
В терминале перейдите в директорию с планом инфраструктуры.
Важно
Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.
-
Удалите ресурсы:
-
Выполните команду:
terraform destroy
-
Подтвердите удаление ресурсов и дождитесь завершения операции.
Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.
-