Обработка потока изменений Debezium
Debezium
- отправить в Yandex Monitoring для построения графика и алертинга;
- записать в поток Data Streams и далее отправить на обработку в Yandex Cloud Functions;
- записать в поток Data Streams и далее передать в Yandex Data Transfer для отправки в различные системы хранения.
В этом сценарии вы отправите изменения базы данных PostgreSQL
Для выполнения сценария:
- Создайте поток данных Data Streams.
- Настройте реквизиты подключения к потоку.
- Настройте Debezium Server.
- Подключите Query к потоку данных.
- Выполните запрос к данным.
Перед началом работы
Зарегистрируйтесь в Yandex Cloud и создайте платежный аккаунт:
- Перейдите в консоль управления
, затем войдите в Yandex Cloud или зарегистрируйтесь. - На странице Yandex Cloud Billing
убедитесь, что у вас подключен платежный аккаунт, и он находится в статусеACTIVE
илиTRIAL_ACTIVE
. Если платежного аккаунта нет, создайте его и привяжите к нему облако.
Если у вас есть активный платежный аккаунт, вы можете создать или выбрать каталог, в котором будет работать ваша инфраструктура, на странице облака
Подробнее об облаках и каталогах.
Создайте поток данных Data Streams
Создайте поток данных c именем debezium
.
Настройте реквизиты подключения к потоку
- Создайте сервисный аккаунт и назначьте ему роль
editor
на ваш каталог. - Создайте статический ключ доступа.
- На сервере, где уже установлена и запущена PostgreSQL, настройте AWS CLI
:-
Установите AWS CLI
и выполните команду:aws configure
-
Последовательно введите:
AWS Access Key ID [None]:
— идентификатор ключа сервисного аккаунта.AWS Secret Access Key [None]:
— секретный ключ сервисного аккаунта.Default region name [None]:
— зону доступностиru-central1
.
-
Настройте Debezium Server
На сервере, где уже установлена и запущена PostgreSQL:
-
Установите Debezium Server по инструкции
. -
Перейдите в каталог
conf
и создайте файлapplication.properties
со следующим содержимым:debezium.sink.type=kinesis debezium.sink.kinesis.region=ru-central1 debezium.sink.kinesis.endpoint=<эндпоинт> debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=localhost debezium.source.database.port=5432 debezium.source.database.user=<имя_пользователя> debezium.source.database.password=<пароль_пользователя> debezium.source.database.dbname=<имя_БД> debezium.source.database.server.name=debezium debezium.source.plugin.name=pgoutput debezium.source.transforms=Reroute debezium.source.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter debezium.source.transforms.Reroute.topic.regex=(.*) debezium.source.transforms.Reroute.topic.replacement=<поток_данных>
Где:
<эндпоинт>
— эндпоинт потока данных Data Streams, например,https://yds.serverless.yandexcloud.net/ru-central1/b1g89ae43m6he********/etn01eg4rn1********
. Эндпоинт можно посмотреть на странице потока (см. Посмотреть список потоков).<поток_данных>
— название потока данных Data Streams.<имя_БД>
— название базы данных PostgreSQL.<имя_пользователя>
— имя пользователя для подключения к базе данных PostgreSQL.<пароль_пользователя>
— пароль пользователя для подключения к базе данных PostgreSQL.
-
Запустите Debezium следующей командой:
JAVA_OPTS=-Daws.cborEnabled=false ./run.sh
-
Выполните какие-либо изменения в базе данных PostgreSQL, например, вставьте данные в таблицу.
-
При правильной настройке в консоли Debezium появятся сообщения вида:
2022-02-11 07:31:12,850 INFO [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 1 records sent during previous 00:19:59.999, last recorded offset: {transaction_id=null, lsn_proc=23576408, lsn_commit=23576120, lsn=23576408, txId=580, ts_usec=1644564672582666}
Подключите Query к потоку данных
- Создайте соединение с именем
yds-connection
и типомData Streams
. - На странице создания привязки:
- Введите имя привязки
debezium
. - Укажите поток данных
cdebezium
. - Добавьте колонку
data
с типомJSON
.
- Введите имя привязки
- Нажмите кнопку Создать.
Выполните запрос к данным
В редакторе запросов в интерфейсе Query выполните следующий запрос:
$debezium_data =
SELECT
JSON_VALUE(data,"$.payload.source.table") AS table_name,
DateTime::FromMilliseconds(cast(JSON_VALUE(data,"$.payload.source.ts_ms") AS Uint64)) AS `timestamp`
FROM bindings.`debezium`;
SELECT
table_name,
HOP_END()
FROM
$debezium_data
GROUP BY
HOP(`timestamp`, "PT10S", "PT10S", "PT10S"),
table_name
LIMIT 2;
Примечание
Данные из потокового источника передаются в виде бесконечного потока. Чтобы остановить обработку и получить результат в консоли, данные в примере ограничены с помощь оператора LIMIT
, который задает количество строк результата.