Поставка данных из Yandex Managed Service for MySQL® в Yandex Managed Service for Apache Kafka® с помощью Debezium
Вы можете отслеживать изменения данных в Managed Service for MySQL® и отправлять их в Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).
Из этой статьи вы узнаете, как создать в Yandex Cloud виртуальную машину и настроить на ней Debezium
Перед началом работы
-
Создайте кластер-источник со следующими настройками:
- с хостами в публичном доступе;
- с базой данных
db1
; - с пользователем
user1
.
-
Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.
-
Создайте виртуальную машину с Ubuntu 20.04 и публичным IP-адресом.
-
Если вы используете группы безопасности, настройте их так, чтобы к кластерам можно было подключаться из интернета и созданной виртуальной машины, а к ней — из интернета по SSH:
-
Подключитесь к виртуальной машине по SSH и выполните ее предварительную настройку:
-
Установите зависимости:
sudo apt update && \ sudo apt install kafkacat openjdk-17-jre mysql-client --yes
-
Создайте директорию для Apache Kafka®:
sudo mkdir -p /opt/kafka/
-
Скачайте и распакуйте в эту директорию архив с исполняемыми файлами Apache Kafka®. Например, для загрузки и распаковки Apache Kafka® версии 3.0 выполните команду:
wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz && \ sudo tar xf kafka_2.13-3.0.0.tgz --strip 1 --directory /opt/kafka/
Актуальную версию Apache Kafka® уточняйте на странице загрузок проекта
. -
Установите на виртуальную машину сертификаты и убедитесь в доступности кластеров:
- Managed Service for Apache Kafka® (используйте утилиту
kafkacat
). - Managed Service for MySQL® (используйте утилиту
mysql
).
- Managed Service for Apache Kafka® (используйте утилиту
-
Создайте директорию, в которой будут храниться файлы, необходимые для работы коннектора Debezium:
sudo mkdir -p /etc/debezium/plugins/
-
Чтобы коннектор Debezium мог подключаться к хостам-брокерам Managed Service for Apache Kafka®, добавьте SSL-сертификат в защищенное хранилище сертификатов Java (Java Key Store). Для дополнительной защиты хранилища в параметре
-storepass
укажите пароль длиной не меньше 6 символов:sudo keytool \ -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore /etc/debezium/keystore.jks \ -storepass <пароль_JKS> \ --noprompt
-
Подготовка кластера-источника
-
Назначьте пользователю
user1
глобальные привилегииREPLICATION CLIENT
иREPLICATION SLAVE
. -
Подключитесь к базе данных
db1
от имени пользователяuser1
. -
Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию с некоторых датчиков автомобиля.
-
Создайте таблицу:
CREATE TABLE measurements ( `device_id` VARCHAR(32) PRIMARY KEY NOT NULL, `datetime` TIMESTAMP NOT NULL, `latitude` REAL NOT NULL, `longitude` REAL NOT NULL, `altitude` REAL NOT NULL, `speed` REAL NOT NULL, `battery_voltage` REAL, `cabin_temperature` REAL NOT NULL, `fuel_level` REAL );
-
Наполните таблицу данными:
INSERT INTO measurements VALUES ('iv9a94th6rzt********', '2020-06-05 17:27:00', 55.70329032, 37.65472196, 427.5, 0, 23.5, 17, NULL), ('rhibbh3y08qm********', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32), ('iv9a94th678t********', '2020-06-07 15:00:10', 55.70985913, 37.62141918, 417.0, 15.7, 10.3, 17, NULL);
-
Настройте коннектор Debezium
-
Подключитесь к виртуальной машине по SSH.
-
Скачайте и распакуйте актуальный Debezium-коннектор
в директорию/etc/debezium/plugins/
.Актуальную версию коннектора уточняйте на странице проекта
. Ниже приведены команды для версии1.9.4.Final
.VERSION="1.9.4.Final" wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/${VERSION}/debezium-connector-mysql-${VERSION}-plugin.tar.gz && \ sudo tar -xzvf debezium-connector-mysql-${VERSION}-plugin.tar.gz -C /etc/debezium/plugins/
-
Создайте файл
/etc/debezium/mdb-connector.conf
с настройками коннектора Debezium для подключения к кластеру-источнику:name=debezium-mmy connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=c-<идентификатор_кластера>.rw.mdb.yandexcloud.net database.port=3306 database.user=user1 database.password=<пароль_пользователя_user1> database.dbname=db1 database.server.name=mmy database.ssl.mode=required_identity table.include.list=db1.measurements heartbeat.interval.ms=15000 heartbeat.topics.prefix=__debezium-heartbeat snapshot.mode=never include.schema.changes=false database.history.kafka.topic=dbhistory.mmy database.history.kafka.bootstrap.servers=<FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 # Producer settings database.history.producer.security.protocol=SSL database.history.producer.ssl.truststore.location=/etc/debezium/keystore.jks database.history.producer.ssl.truststore.password=<пароль_JKS> database.history.producer.sasl.mechanism=SCRAM-SHA-512 database.history.producer.security.protocol=SASL_SSL database.history.producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль_пользователя_debezium>"; # Consumer settings database.history.consumer.security.protocol=SSL database.history.consumer.ssl.truststore.location=/etc/debezium/keystore.jks database.history.consumer.ssl.truststore.password=<пароль_JKS> database.history.consumer.sasl.mechanism=SCRAM-SHA-512 database.history.consumer.security.protocol=SASL_SSL database.history.consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль_пользователя_debezium>";
Где:
-
name
— логическое имя коннектора Debezium. Используется для внутренних нужд коннектора. -
database.hostname
— особый FQDN для подключения к хосту-мастеру кластера-источника.Идентификатор кластера можно получить со списком кластеров в каталоге.
-
database.user
— имя пользователя MySQL®. -
database.dbname
— имя базы данных MySQL®. -
database.server.name
— имя сервера баз данных, которое Debezium будет использовать при выборе топика для отправки сообщений. -
table.include.list
— имена таблиц, для которых Debezium должен отслеживать изменения. Укажите полные имена, включающие в себя имя базы данных (db1
). Debezium будет использовать значения настроек из этого поля при выборе топика для отправки сообщений. -
heartbeat.interval.ms
иheartbeat.topics.prefix
— настройки heartbeat, необходимые для работы Debezium. -
database.history.kafka.topic
— имя служебного топика, используемого коннектором для отправки уведомлений об изменениях в схеме данных кластера-источника.
-
Подготовьте кластер-приемник
-
Создайте топик, в который будут помещаться данные, поступающие от кластера-источника:
-
Имя —
mmy.db1.measurements
.Имена топиков для данных конструируются
по принципу<имя_сервера>.<имя_БД>.<имя_таблицы>
.Согласно файлу настроек коннектора Debezium:
- Имя сервера
mmy
указано в параметреdatabase.server.name
. - Имя базы данных
db1
указано вместе с именем таблицыmeasurements
в параметреtable.include.list
.
- Имя сервера
Если необходимо отслеживать изменения в нескольких таблицах, создайте для каждой из них отдельный топик.
-
-
Создайте служебный топик для отслеживания состояния коннектора:
-
Имя —
__debezium-heartbeat.mmy
.Имена служебных топиков конструируются
по принципу<префикс_для_heartbeat>.<имя_сервера>
.Согласно файлу настроек коннектора Debezium:
- Префикс
__debezium-heartbeat
указан в параметреheartbeat.topics.prefix
. - Имя сервера
mmy
указано в параметреdatabase.server.name
.
- Префикс
-
Политика очистки лога —
Compact
.
Если необходимо получать данные из нескольких кластеров-источников, создайте для каждого из них отдельный служебный топик.
-
-
Создайте служебный топик для отслеживания изменений в схеме формата данных:
- Имя —
dbhistory.mmy
. - Политика очистки лога —
Delete
. - Количество разделов —
1
.
- Имя —
-
Создайте пользователя с именем
debezium
. -
Выдайте пользователю
debezium
праваACCESS_ROLE_CONSUMER
иACCESS_ROLE_PRODUCER
на созданные топики.
Запустите коннектор Debezium
-
Создайте файл с настройками воркера Debezium:
/etc/debezium/worker.conf
# AdminAPI connect properties bootstrap.servers=<FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_SSL ssl.truststore.location=/etc/debezium/keystore.jks ssl.truststore.password=<пароль_JKS> sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль_пользователя_debezium>"; # Producer connect properties producer.sasl.mechanism=SCRAM-SHA-512 producer.security.protocol=SASL_SSL producer.ssl.truststore.location=/etc/debezium/keystore.jks producer.ssl.truststore.password=<пароль_JKS> producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль_пользователя_debezium>"; # Worker properties plugin.path=/etc/debezium/plugins/ key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/etc/debezium/worker.offset
-
В отдельном терминале запустите коннектор:
sudo /opt/kafka/bin/connect-standalone.sh \ /etc/debezium/worker.conf \ /etc/debezium/mdb-connector.properties
Проверьте работоспособность Debezium
-
В отдельном терминале запустите утилиту
kafkacat
в режиме потребителя:kafkacat \ -C \ -b <FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 \ -t mmy.db1.measurements \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=debezium \ -X sasl.password=<пароль> \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -Z \ -K:
Будет выведена схема формата данных таблицы
db1.measurements
и данные о добавленных в нее ранее строках.Пример фрагмента сообщения
{ "schema": { ... }, "payload": { "before": null, "after": { "device_id": "iv9a94th6rzt********", "datetime": 1591378020000000, "latitude": 55.70329, "longitude": 37.65472, "altitude": 427.5, "speed": 0.0, "battery_voltage": 23.5, "cabin_temperature": 17.0, "fuel_level": null }, "source": { "version": "1.8.1.Final", "connector": "mysql", "name": "mmy", "ts_ms": 1628245046882, "snapshot": "true", "db": "db1", "sequence": "[null,\"4328525512\"]", "table": "measurements", "txId": 8861, "lsn": 4328525328, "xmin": null }, "op": "r", "ts_ms": 1628245046893, "transaction": null } }
-
Подключитесь к кластеру-источнику и добавьте еще одну строку в таблицу
measurements
:INSERT INTO measurements VALUES ('iv7b74th678t********', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
-
Убедитесь, что в терминале с запущенной утилитой
kafkacat
отобразились сведения о добавленной строке.
Удалите созданные ресурсы
Удалите ресурсы, которые вы больше не будете использовать, чтобы за них не списывалась плата:
-
Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, освободите и удалите его.
-
Удалите кластеры: