Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
Вы можете отслеживать изменения данных в кластере-источнике Managed Service for MySQL® и отправлять их в кластер-приемник Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).
Чтобы настроить CDC с использованием сервиса Data Transfer:
- Подготовьте кластер-источник.
- Подготовьте кластер-приемник.
- Подготовьте и активируйте трансфер.
- Проверьте работоспособность трансфера.
Если созданные ресурсы вам больше не нужны, удалите их.
Необходимые платные ресурсы
- Кластер Managed Service for MySQL®: выделенные хостам вычислительные ресурсы, объем хранилища и резервных копий (см. тарифы Managed Service for MySQL®).
- Кластер Managed Service for Apache Kafka®: выделенные хостам вычислительные ресурсы, объем хранилища и резервных копий (см. тарифы Managed Service for Apache Kafka®).
- Публичные IP-адреса, если для хостов кластеров включен публичный доступ (см. тарифы Virtual Private Cloud).
- Каждый трансфер: использование вычислительных ресурсов и количество переданных строк данных (см. тарифы Data Transfer).
Перед началом работы
-
Создайте кластер-источник Managed Service for MySQL® любой подходящей конфигурации со следующими настройками:
- с базой данных
db1; - с пользователем
my-user; - с хостами в публичном доступе.
- с базой данных
-
Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.
-
Если вы используете группы безопасности, настройте их так, чтобы к кластерам можно было подключаться из интернета:
-
Установите на локальный компьютер утилиту
kcat(kafkacat) и утилиту командной строки MySQL . Например, в Ubuntu 20.04 выполните команду:sudo apt update && sudo apt install kafkacat mysql-client --yesУбедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.
Подготовьте кластер-источник
-
Чтобы сервис Data Transfer мог получать от кластера Managed Service for MySQL® уведомления об изменениях в данных, в кластере-источнике необходимо настроить внешнюю репликацию. Чтобы пользователь
my-userмог выполнять репликацию, назначьте ему рольALL_PRIVILEGESдля базы данныхdb1и выдайте глобальные привилегииREPLICATION CLIENTиREPLICATION SLAVE. -
Подключитесь к базе данных
db1от имени пользователяmy-user. -
Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию, поступающую от некоторых датчиков автомобиля.
Создайте таблицу:
CREATE TABLE db1.measurements ( device_id varchar(200) NOT NULL, datetime timestamp NOT NULL, latitude real NOT NULL, longitude real NOT NULL, altitude real NOT NULL, speed real NOT NULL, battery_voltage real, cabin_temperature real NOT NULL, fuel_level real, PRIMARY KEY (device_id) );Наполните таблицу данными:
INSERT INTO db1.measurements VALUES ('iv9a94th6rzt********', '2022-06-05 17:27:00', 55.70329032, 37.65472196, 427.5, 0, 23.5, 17, NULL), ('rhibbh3y08qm********', '2022-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32);
Подготовьте кластер-приемник
Настройки различаются в зависимости от используемого способа управления топиками. При этом имена топиков для данных формируются по следующему принципу — <префикс_топика>.<имя_схемы>.<имя_таблицы>. В этом руководстве в качестве примера будет использоваться префикс cdc.
Если управление топиками осуществляется с помощью стандартных интерфейсов Yandex Cloud (Консоль управления, CLI, Terraform, API):
-
Создайте топик с именем
cdc.db1.measurements.Для отслеживания изменений в нескольких таблицах создайте для каждой из них отдельный топик.
-
Создайте пользователя с именем
kafka-userи ролямиACCESS_ROLE_CONSUMERиACCESS_ROLE_PRODUCER, действующими для созданных топиков. Чтобы включить все такие топики, укажите в имени топикаcdc.*.
Если для управления топиками используется Kafka Admin API:
-
Создайте пользователя-администратора с именем
kafka-user. -
Помимо роли
ACCESS_ROLE_ADMINназначьте пользователю-администратору ролиACCESS_ROLE_CONSUMERиACCESS_ROLE_PRODUCERдля топиков, имена которых начинаются с префиксаcdc.Необходимые топики будут созданы автоматически при первом событии изменения в отслеживаемых таблицах кластера-источника. Такое решение может быть удобным для отслеживания изменений во множестве таблиц, однако, требует запас свободного места в хранилище кластера. Подробнее см. в разделе Хранилище в Managed Service for Apache Kafka®.
Подготовьте и активируйте трансфер
-
Создайте эндпоинт для источника MySQL® с настройками:
- Тип базы данных —
MySQL. - Параметры эндпоинта:
- Настройки подключения —
Кластер Managed Service for MySQL. - Кластер Managed Service for MySQL — выберите созданный ранее кластер Managed Service for MySQL®.
- База данных —
db1. - Пользователь —
my-user. - Пароль — укажите пароль пользователя
my-user. - Список включённых таблиц —
db1.measurements.
- Настройки подключения —
- Тип базы данных —
-
Создайте эндпоинт для приемника Apache Kafka® с настройками:
-
Тип базы данных —
Kafka. -
Параметры эндпоинта:
-
Тип подключения —
Кластер Managed Service for Apache Kafka.- Кластер Managed Service for Apache Kafka — выберите созданный ранее кластер Managed Service for Apache Kafka®.
- Аутентификация — укажите данные созданного ранее пользователя
kafka-user.
-
Топик —
Полное имя топика. -
Полное имя топика —
cdc.db1.measurements.
Если необходимо отслеживать изменения в нескольких таблицах, заполните поля следующим образом:
- Топик —
Префикс топика. - Префикс топика — укажите префикс
cdc, использованный при формировании имен топиков.
-
-
-
Создайте трансфер типа Репликация с созданными эндпоинтами для источника и приемника.
-
Активируйте трансфер и дождитесь его перехода в статус Реплицируется.
Проверьте работоспособность трансфера
-
В отдельном терминале запустите утилиту
kafkacatв режиме потребителя:kafkacat \ -C \ -b <FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 \ -t cdc.db1.measurements \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=kafka-user \ -X sasl.password=<пароль> \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -Z \ -K:FQDN хостов-брокеров можно получить со списком хостов в кластере Managed Service for Apache Kafka®.
-
Подключитесь к кластеру-источнику и добавьте данные в таблицу
measurements:INSERT INTO db1.measurements VALUES ('iv7b74th678t********', '2022-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL), ('iv9a94th678t********', '2022-06-07 15:00:10', 55.70985913, 37.62141918, 417.0, 15.7, 10.3, 17, NULL); -
Убедитесь, что в терминале с запущенной утилитой
kafkacatотобразились схема формата данных таблицыdb1.measurementsи сведения о добавленных строках.Пример фрагмента сообщения
{ "payload": { "device_id": "iv7b74th678t********" }, "schema": { "fields": [ { "field": "device_id", "optional": false, "type": "string" } ], "name": "cdc.db1.measurements.Key", "optional": false, "type": "struct" } }: { "payload": { "after": { "altitude": 378, "battery_voltage": 5.3, "cabin_temperature": 20, "datetime": "2020-06-08T17:45:00Z", "device_id": "iv7b74th678t********", "fuel_level": null, "latitude": 53.70987913, "longitude": 36.62549834, "speed": 20.5 }, "before": null, "op": "c", "source": { "connector": "mysql", "db": "db1", "file": "mysql-log.000016", "gtid": "1e46a80b-2e96-11ed-adf7-d00d183780**:*-*****", "name": "cdc", "pos": 1547357, "query": null, "row": 0, "server_id": 0, "snapshot": "false", "table": "measurements", "thread": null, "ts_ms": 1662632515000, "version": "1.1.2.Final" }, "transaction": null, "ts_ms": 1662632515000 }, "schema": { "fields": [ { "field": "before", "fields": [ { "field": "device_id", "optional": false, "type": "string" }, ... ], "name": "cdc.db1.measurements.Value", "optional": true, "type": "struct" }, { "field": "after", "fields": [ ... ], "name": "cdc.db1.measurements.Value", "optional": true, "type": "struct" }, { "field": "source", "fields": [ { "field": "version", "optional": false, "type": "string" }, ... ], "name": "io.debezium.connector.mysql.Source", "optional": false, "type": "struct" }, { "field": "op", "optional": false, "type": "string" }, ... ], "name": "cdc.db1.measurements.Envelope", "optional": false, "type": "struct" } }
Особенности поставки данных с помощью Data Transfer
-
При переносе данных из MySQL® в Apache Kafka® некоторые типы данных переносятся с изменениями:
- тип
tinyint(1)переносится какboolean; - тип
realпереносится какdouble; - тип
bigint unsignedпереносится какint64.
- тип
-
В блоке метаинформации об источнике
payload.sourceпараметрыserver_idиthreadне заполняются.
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
-
Деактивируйте и удалите трансфер.
-
Удалите кластеры:
-
Если для доступа к хостам кластеров использовались статические публичные IP-адреса, освободите и удалите их.