Захват изменений из PostgreSQL и поставка в Apache Kafka®
Вы можете отслеживать изменения данных в кластере-источнике Managed Service for PostgreSQL и отправлять их в кластер-приемник Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).
Чтобы настроить CDC с использованием сервиса Data Transfer:
- Подготовьте кластер-источник.
- Подготовьте кластер-приемник.
- Подготовьте и активируйте трансфер.
- Проверьте работоспособность трансфера.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
-
Создайте кластер-источник Managed Service for PostgreSQL любой подходящей конфигурации со следующими настройками:
- с базой данных
db1
; - с пользователем
pg-user
; - с хостами в публичном доступе.
- с базой данных
-
Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.
-
Если вы используете группы безопасности, настройте их так, чтобы к кластерам можно было подключаться из интернета:
-
Установите на локальный компьютер утилиту
kcat
(kafkacat
) и клиент командной строки PostgreSQL . Например, в Ubuntu 20.04 выполните команду:sudo apt update && sudo apt install kafkacat postgresql-client --yes
Подготовьте кластер-источник
-
Чтобы сервис Data Transfer мог получать от кластера Managed Service for PostgreSQL уведомления об изменениях в данных, в кластере-источнике необходимо создать публикацию (publication). Чтобы пользователь
pg-user
мог создать публикацию, назначьте ему рольmdb_replication
. -
Подключитесь к базе данных
db1
от имени пользователяpg-user
. -
Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию с некоторых датчиков автомобиля.
Создайте таблицу:
CREATE TABLE public.measurements ( "device_id" text PRIMARY KEY NOT NULL, "datetime" timestamp NOT NULL, "latitude" real NOT NULL, "longitude" real NOT NULL, "altitude" real NOT NULL, "speed" real NOT NULL, "battery_voltage" real, "cabin_temperature" real NOT NULL, "fuel_level" real );
Наполните таблицу данными:
INSERT INTO public.measurements VALUES ('iv9a94th6rzt********', '2020-06-05 17:27:00', 55.70329032, 37.65472196, 427.5, 0, 23.5, 17, NULL), ('rhibbh3y08qm********', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32), ('iv9a94th678t********', '2020-06-07 15:00:10', 55.70985913, 37.62141918, 417.0, 15.7, 10.3, 17, NULL);
Подготовьте кластер-приемник
Настройки различаются в зависимости от используемого способа управления топиками. При этом имена топиков для данных конструируются по тому же принципу, что и в Debezium<префикс_топика>.<имя_схемы>.<имя_таблицы>
. В этом руководстве в качестве примера будет использоваться префикс cdc
.
Если управление топиками осуществляется с помощью стандартных интерфейсов Yandex Cloud (Консоль управления, YC CLI, Terraform, API):
-
Создайте топик с именем
cdc.public.measurements
.Если необходимо отслеживать изменения в нескольких таблицах, создайте для каждой из них отдельный топик.
-
Создайте пользователя с именем
kafka-user
и ролямиACCESS_ROLE_CONSUMER
иACCESS_ROLE_PRODUCER
, действующими на созданные топики.
Если для управления топиками используется Kafka Admin API:
-
Создайте пользователя-администратора с именем
kafka-user
. -
Помимо роли
ACCESS_ROLE_ADMIN
назначьте пользователю-администратору ролиACCESS_ROLE_CONSUMER
иACCESS_ROLE_PRODUCER
на топики, имена которых начинаются с префиксаcdc
.Необходимые топики будут созданы автоматически при первом событии изменения в отслеживаемых таблицах кластера-источника. Такое решение может быть удобным для отслеживания изменений во множестве таблиц, однако, требует запас свободного места в хранилище кластера. Подробнее см. в разделе Хранилище в Managed Service for Apache Kafka®.
Подготовьте и активируйте трансфер
-
-
Эндпоинт для источника:
- Тип базы данных —
PostgreSQL
. - Параметры эндпоинта:
- Настройки подключения —
Кластер Managed Service for PostgreSQL
. - Кластер Managed Service for PostgreSQL — выберите созданный ранее кластер Managed Service for PostgreSQL.
- База данных —
db1
. - Пользователь —
pg-user
. - Пароль — укажите пароль пользователя
pg-user
. - Список включённых таблиц —
public.measurements
.
- Настройки подключения —
- Тип базы данных —
-
Эндпоинт для приемника:
-
Тип базы данных —
Kafka
. -
Параметры эндпоинта:
-
Тип подключения —
Кластер Managed Service for Apache Kafka
.- Кластер Managed Service for Apache Kafka — выберите кластер-приемник.
- Аутентификация — укажите данные созданного ранее пользователя
kafka-user
.
-
Топик —
Полное имя топика
. -
Полное имя топика —
cdc.public.measurements
.
Если необходимо отслеживать изменения в нескольких таблицах, заполните поля следующим образом:
- Топик —
Префикс топика
. - Префикс топика — укажите префикс
cdc
, использованный при формировании имен топиков.
-
-
-
-
Создайте трансфер со следующими настройками:
- Эндпоинты:
- Источник — созданный ранее эндпоинт для источника.
- Приёмник — созданный ранее эндпоинт для приемника.
- Тип трансфера — Репликация.
- Эндпоинты:
-
Активируйте трансфер и дождитесь его перехода в статус Реплицируется.
Проверьте работоспособность трансфера
-
В отдельном терминале запустите утилиту
kafkacat
в режиме потребителя:kafkacat \ -C \ -b <FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 \ -t cdc.public.measurements \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=kafka-user \ -X sasl.password=<пароль> \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -Z \ -K:
FQDN хостов-брокеров можно получить со списком хостов в кластере Managed Service for Apache Kafka®.
Будет выведена схема формата данных таблицы
public.measurements
и данные о добавленных в нее ранее строках.Пример фрагмента сообщения
{ "payload": { "consumer":"dttuhfpp97l3********" }, "schema": { "fields": [ { "field": "consumer", "optional":false, "type":"string" } ], "name": "__data_transfer_stub.public.__consumer_keeper.Key", "optional":false, "type":"struct" } }:{ "payload": { "after": { "consumer":"dttuhfpp97l3********l", "locked_by":"dttuhfpp97l3********-1", "locked_till":"2022-05-15T09:55:18Z" }, "before": null, "op":"u", "source": { "connector":"postgresql", "db":"db1", "lsn":85865797008, "name":"__data_transfer_stub", "schema":"public", "snapshot":"false", "table":"__consumer_keeper", "ts_ms":1652608518883, "txId":245165, "version":"1.1.2.Final", "xmin":null }, ...
-
Подключитесь к кластеру-источнику и добавьте данные в таблицу
measurements
:INSERT INTO public.measurements VALUES ('iv7b74th678t********', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
-
Убедитесь, что в терминале с запущенной утилитой
kafkacat
отобразились сведения о добавленной строке.
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
-
Деактивируйте и удалите трансфер.
-
Удалите кластеры:
-
Если для доступа к хостам кластеров использовались статические публичные IP-адреса, освободите и удалите их.
Дополнительные материалы
Больше информации о сценариях поставок данных в вебинаре Yandex Cloud: