Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
Вы можете отслеживать изменения данных в кластере-источнике Managed Service for MySQL® и отправлять их в кластер-приемник Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).
Чтобы настроить CDC с использованием сервиса Data Transfer:
- Подготовьте кластер-источник.
- Подготовьте кластер-приемник.
- Подготовьте и активируйте трансфер.
- Проверьте работоспособность трансфера.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
-
Создайте кластер-источник Managed Service for MySQL® любой подходящей конфигурации со следующими настройками:
- с базой данных
db1
; - с пользователем
my-user
; - с хостами в публичном доступе.
- с базой данных
-
Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.
-
Если вы используете группы безопасности, настройте их так, чтобы к кластерам можно было подключаться из интернета:
-
Установите на локальный компьютер утилиту
kcat
(kafkacat
) и утилиту командной строки MySQL . Например, в Ubuntu 20.04 выполните команду:sudo apt update && sudo apt install kafkacat mysql-client --yes
Подготовьте кластер-источник
-
Чтобы сервис Data Transfer мог получать от кластера Managed Service for MySQL® уведомления об изменениях в данных, в кластере-источнике необходимо настроить внешнюю репликацию. Чтобы пользователь
my-user
мог выполнять репликацию, назначьте ему рольALL_PRIVILEGES
для базы данныхdb1
и выдайте глобальные привилегииREPLICATION CLIENT
иREPLICATION SLAVE
. -
Подключитесь к базе данных
db1
от имени пользователяmy-user
. -
Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию, поступающую от некоторых датчиков автомобиля.
Создайте таблицу:
CREATE TABLE db1.measurements ( device_id varchar(200) NOT NULL, datetime timestamp NOT NULL, latitude real NOT NULL, longitude real NOT NULL, altitude real NOT NULL, speed real NOT NULL, battery_voltage real, cabin_temperature real NOT NULL, fuel_level real, PRIMARY KEY (device_id) );
Наполните таблицу данными:
INSERT INTO db1.measurements VALUES ('iv9a94th6rzt********', '2022-06-05 17:27:00', 55.70329032, 37.65472196, 427.5, 0, 23.5, 17, NULL), ('rhibbh3y08qm********', '2022-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32);
Подготовьте кластер-приемник
Настройки различаются в зависимости от используемого способа управления топиками. При этом имена топиков для данных формируются по следующему принципу — <префикс_топика>.<имя_схемы>.<имя_таблицы>
. В этом руководстве в качестве примера будет использоваться префикс cdc
.
Если управление топиками осуществляется с помощью стандартных интерфейсов Yandex Cloud (Консоль управления, YC CLI, Terraform, API):
-
Создайте топик с именем
cdc.db1.measurements
.Для отслеживания изменений в нескольких таблицах создайте для каждой из них отдельный топик.
-
Создайте пользователя с именем
kafka-user
и ролямиACCESS_ROLE_CONSUMER
иACCESS_ROLE_PRODUCER
, действующими для созданных топиков. Чтобы включить все такие топики, укажите в имени топикаcdc.*
.
Если для управления топиками используется Kafka Admin API:
-
Создайте пользователя-администратора с именем
kafka-user
. -
Помимо роли
ACCESS_ROLE_ADMIN
назначьте пользователю-администратору ролиACCESS_ROLE_CONSUMER
иACCESS_ROLE_PRODUCER
для топиков, имена которых начинаются с префиксаcdc
.Необходимые топики будут созданы автоматически при первом событии изменения в отслеживаемых таблицах кластера-источника. Такое решение может быть удобным для отслеживания изменений во множестве таблиц, однако, требует запас свободного места в хранилище кластера. Подробнее см. в разделе Хранилище в Managed Service for Apache Kafka®.
Подготовьте и активируйте трансфер
-
Создайте эндпоинт для источника MySQL® с настройками:
- Тип базы данных —
MySQL
. - Параметры эндпоинта:
- Настройки подключения —
Кластер Managed Service for MySQL
. - Кластер Managed Service for MySQL — выберите созданный ранее кластер Managed Service for MySQL®.
- База данных —
db1
. - Пользователь —
my-user
. - Пароль — укажите пароль пользователя
my-user
. - Список включённых таблиц —
db1.measurements
.
- Настройки подключения —
- Тип базы данных —
-
Создайте эндпоинт для приемника Apache Kafka® с настройками:
-
Тип базы данных —
Kafka
. -
Параметры эндпоинта:
-
Тип подключения —
Кластер Managed Service for Apache Kafka
.- Кластер Managed Service for Apache Kafka — выберите созданный ранее кластер Managed Service for Apache Kafka®.
- Аутентификация — укажите данные созданного ранее пользователя
kafka-user
.
-
Топик —
Полное имя топика
. -
Полное имя топика —
cdc.db1.measurements
.
Если необходимо отслеживать изменения в нескольких таблицах, заполните поля следующим образом:
- Топик —
Префикс топика
. - Префикс топика — укажите префикс
cdc
, использованный при формировании имен топиков.
-
-
-
Создайте трансфер типа Репликация с созданными эндпоинтами для источника и приемника.
-
Активируйте трансфер и дождитесь его перехода в статус Реплицируется.
Проверьте работоспособность трансфера
-
В отдельном терминале запустите утилиту
kafkacat
в режиме потребителя:kafkacat \ -C \ -b <FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 \ -t cdc.db1.measurements \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=kafka-user \ -X sasl.password=<пароль> \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -Z \ -K:
FQDN хостов-брокеров можно получить со списком хостов в кластере Managed Service for Apache Kafka®.
-
Подключитесь к кластеру-источнику и добавьте данные в таблицу
measurements
:INSERT INTO db1.measurements VALUES ('iv7b74th678t********', '2022-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL), ('iv9a94th678t********', '2022-06-07 15:00:10', 55.70985913, 37.62141918, 417.0, 15.7, 10.3, 17, NULL);
-
Убедитесь, что в терминале с запущенной утилитой
kafkacat
отобразились схема формата данных таблицыdb1.measurements
и сведения о добавленных строках.Пример фрагмента сообщения
{ "payload": { "device_id": "iv7b74th678t********" }, "schema": { "fields": [ { "field": "device_id", "optional": false, "type": "string" } ], "name": "cdc.db1.measurements.Key", "optional": false, "type": "struct" } }: { "payload": { "after": { "altitude": 378, "battery_voltage": 5.3, "cabin_temperature": 20, "datetime": "2020-06-08T17:45:00Z", "device_id": "iv7b74th678t********", "fuel_level": null, "latitude": 53.70987913, "longitude": 36.62549834, "speed": 20.5 }, "before": null, "op": "c", "source": { "connector": "mysql", "db": "db1", "file": "mysql-log.000016", "gtid": "1e46a80b-2e96-11ed-adf7-d00d183780**:*-*****", "name": "cdc", "pos": 1547357, "query": null, "row": 0, "server_id": 0, "snapshot": "false", "table": "measurements", "thread": null, "ts_ms": 1662632515000, "version": "1.1.2.Final" }, "transaction": null, "ts_ms": 1662632515000 }, "schema": { "fields": [ { "field": "before", "fields": [ { "field": "device_id", "optional": false, "type": "string" }, ... ], "name": "cdc.db1.measurements.Value", "optional": true, "type": "struct" }, { "field": "after", "fields": [ ... ], "name": "cdc.db1.measurements.Value", "optional": true, "type": "struct" }, { "field": "source", "fields": [ { "field": "version", "optional": false, "type": "string" }, ... ], "name": "io.debezium.connector.mysql.Source", "optional": false, "type": "struct" }, { "field": "op", "optional": false, "type": "string" }, ... ], "name": "cdc.db1.measurements.Envelope", "optional": false, "type": "struct" } }
Особенности поставки данных с помощью Data Transfer
-
При переносе данных из MySQL® в Apache Kafka® некоторые типы данных переносятся с изменениями:
- тип
tinyint(1)
переносится какboolean
; - тип
real
переносится какdouble
; - тип
bigint unsigned
переносится какint64
.
- тип
-
В блоке метаинформации об источнике
payload.source
параметрыserver_id
иthread
не заполняются.
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
-
Деактивируйте и удалите трансфер.
-
Удалите кластеры:
-
Если для доступа к хостам кластеров использовались статические публичные IP-адреса, освободите и удалите их.