Обработка потока изменений Debezium
Debezium
Ниже приведена архитектура решения:
Настройка
Для получения потока данных необходимо:
- Создать поток данных Yandex Data Streams.
- Настроить реквизиты подключения к Yandex Data Streams.
- Настроить и запустить Debezium Server.
- Настроить триггер в Cloud Functions для обработки данных.
Создание потока данных
Создайте поток данных Yandex Data Streams с именем debezium
. Процедура создания потока данных подробно описана в документации Yandex Data Streams
Настройка реквизитов подключения к Yandex Data Streams
- Войдите в консоль управления
. Если вы еще не зарегистрированы, перейдите в консоль управления и следуйте инструкциям. - На странице Биллинг
убедитесь, что у вас подключен платежный аккаунт, и он находится в статусеACTIVE
илиTRIAL_ACTIVE
. Если платежного аккаунта нет, создайте его. - Если у вас еще нет каталога, создайте его.
- Создайте сервисный аккаунт и назначьте ему роль
editor
на ваш каталог. - Создайте статический ключ доступа.
- Настройте AWS CLI:
-
Установите AWS CLI
и выполните команду:aws configure
-
Последовательно введите:
AWS Access Key ID [None]:
— идентификатор ключа сервисного аккаунта.AWS Secret Access Key [None]:
— секретный ключ сервисного аккаунта.Default region name [None]:
— зону доступностиru-central1
.
-
Настройка Debezium Server
В этом примере рассматривается взаимодействие Debezium и PostgreSQL. Далее будем считать, что Debezium будет устанавливаться на сервере, где запущен PostgreSQL.
-
Установите Debezium Server по инструкции
. -
Перейдите в каталог
conf
и создайте файлapplication.properties
со следующим содержимым:debezium.sink.type=kinesis debezium.sink.kinesis.region=ru-central1 debezium.sink.kinesis.endpoint=<эндпоинт> debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=localhost debezium.source.database.port=5432 debezium.source.database.user=<имя_пользователя> debezium.source.database.password=<пароль_пользователя> debezium.source.database.dbname=<имя_БД> debezium.source.database.server.name=debezium debezium.source.plugin.name=pgoutput debezium.source.transforms=Reroute debezium.source.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter debezium.source.transforms.Reroute.topic.regex=(.*) debezium.source.transforms.Reroute.topic.replacement=<поток_данных>
Где:
<эндпоинт>
— эндпоинт потока данных Data Streams, например,https://yds.serverless.yandexcloud.net/ru-central1/b1g89ae43m6he********/etn01eg4rn1********
. Эндпоинт можно посмотреть на странице потока (см. Посмотреть список потоков).<поток_данных>
— название потока данных Data Streams.<имя_БД>
— название базы данных PostgreSQL.<имя_пользователя>
— имя пользователя для подключения к базе данных PostgreSQL.<пароль_пользователя>
— пароль пользователя для подключения к базе данных PostgreSQL.
-
Запустите Debezium следующей командой:
JAVA_OPTS=-Daws.cborEnabled=false ./run.sh
-
Выполните какие-либо изменения в базе данных PostgreSQL, например, вставьте данные в таблицу.
-
При правильной настройке в консоли Debezium появятся сообщения вида:
2022-02-11 07:31:12,850 INFO [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 1 records sent during previous 00:19:59.999, last recorded offset: {transaction_id=null, lsn_proc=23576408, lsn_commit=23576120, lsn=23576408, txId=580, ts_usec=1644564672582666}
Настроить триггер в Cloud Functions
Создайте триггер в Cloud Functions к потоку данных Yandex Data Streams debezium
, который был создан выше.
Процедура создания триггера подробно описана в документации Cloud Functions.
В триггер Cloud Functions будут отправлять нотификации обо всех изменениях в базе данных. В коде триггера вы можете обработать эти изменения, реализовав любую необходимую программную обработку.