Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Query
    • Все руководства
    • Обработка логов Cloud Logging
    • Обработка потока изменений Debezium
    • Визуализация данных из Object Storage в DataLens
    • Обработка аудитных логов Audit Trails
    • Обработка файлов детализации в сервисе Yandex Cloud Billing
    • Анализ данных с помощью Jupyter
    • Автоматизация задач с помощью Managed Service for Apache Airflow™
    • Анализ данных с помощью Query
    • Работа с данными в Object Storage
    • Работа с данными в Managed Service for ClickHouse®
    • Работа с данными в Managed Service for PostgreSQL
    • Федеративные запросы к данным
    • Поиск событий Yandex Cloud в Query
  • Управление доступом
  • Правила тарификации
  • Интеграции
  • Аудитные логи Audit Trails
  • Вопросы и ответы
  • Публичные материалы
  • Обучающие курсы

В этой статье:

  • Перед началом работы
  • Создайте поток данных Data Streams
  • Настройте реквизиты подключения к потоку
  • Настройте Debezium Server
  • Подключите Query к потоку данных
  • Выполните запрос к данным
  • См. также
  1. Практические руководства
  2. Обработка потока изменений Debezium

Обработка потока изменений Debezium

Статья создана
Yandex Cloud
Обновлена 7 марта 2025 г.
  • Перед началом работы
  • Создайте поток данных Data Streams
  • Настройте реквизиты подключения к потоку
  • Настройте Debezium Server
  • Подключите Query к потоку данных
  • Выполните запрос к данным
  • См. также

Debezium — это сервис для захвата изменений в базах данных и отправки их на обработку в другие системы. С помощью Yandex Data Streams можно захватывать эти изменения, а с помощью Yandex Query выполнять их обработку. Обработанные данные можно:

  • отправить в Yandex Monitoring для построения графика и алертинга;
  • записать в поток Data Streams и далее отправить на обработку в Yandex Cloud Functions;
  • записать в поток Data Streams и далее передать в Yandex Data Transfer для отправки в различные системы хранения.

debezium-architecture

В этом сценарии вы отправите изменения базы данных PostgreSQL в поток Data Streams с помощью Debezium, а затем выполните к ним запрос с помощью Query. В результате выполнения запроса вы получите количество изменений в таблицах БД, сгруппированное по интервалам продолжительностью 10 секунд. Рассматривается установка Debezium на сервер, где уже установлена и запущена PostgreSQL.

Для выполнения сценария:

  1. Создайте поток данных Data Streams.
  2. Настройте реквизиты подключения к потоку.
  3. Настройте Debezium Server.
  4. Подключите Query к потоку данных.
  5. Выполните запрос к данным.

Перед началом работы

Зарегистрируйтесь в Yandex Cloud и создайте платежный аккаунт:

  1. Перейдите в консоль управления, затем войдите в Yandex Cloud или зарегистрируйтесь.
  2. На странице Yandex Cloud Billing убедитесь, что у вас подключен платежный аккаунт, и он находится в статусе ACTIVE или TRIAL_ACTIVE. Если платежного аккаунта нет, создайте его и привяжите к нему облако.

Если у вас есть активный платежный аккаунт, вы можете создать или выбрать каталог, в котором будет работать ваша инфраструктура, на странице облака.

Подробнее об облаках и каталогах.

Создайте поток данных Data Streams

Создайте поток данных c именем debezium.

Настройте реквизиты подключения к потоку

  1. Создайте сервисный аккаунт и назначьте ему роль editor на ваш каталог.
  2. Создайте статический ключ доступа.
  3. На сервере, где уже установлена и запущена PostgreSQL, настройте AWS CLI:
    1. Установите AWS CLI и выполните команду:

      aws configure
      
    2. Последовательно введите:

      • AWS Access Key ID [None]: — идентификатор ключа сервисного аккаунта.
      • AWS Secret Access Key [None]: — секретный ключ сервисного аккаунта.
      • Default region name [None]: — зону доступности ru-central1.

Настройте Debezium Server

На сервере, где уже установлена и запущена PostgreSQL:

  1. Установите Debezium Server по инструкции.

  2. Перейдите в каталог conf и создайте файл application.properties со следующим содержимым:

    debezium.sink.type=kinesis
    debezium.sink.kinesis.region=ru-central1
    debezium.sink.kinesis.endpoint=<эндпоинт>
    debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
    debezium.source.offset.storage.file.filename=data/offsets.dat
    debezium.source.offset.flush.interval.ms=0
    debezium.source.database.hostname=localhost
    debezium.source.database.port=5432
    debezium.source.database.user=<имя_пользователя>
    debezium.source.database.password=<пароль_пользователя>
    debezium.source.database.dbname=<имя_БД>
    debezium.source.database.server.name=debezium
    debezium.source.plugin.name=pgoutput
    
    debezium.source.transforms=Reroute
    debezium.source.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
    debezium.source.transforms.Reroute.topic.regex=(.*)
    debezium.source.transforms.Reroute.topic.replacement=<поток_данных>
    

    Где:

    • <эндпоинт> — эндпоинт потока данных Data Streams, например, https://yds.serverless.yandexcloud.net/ru-central1/b1g89ae43m6he********/etn01eg4rn1********. Эндпоинт можно посмотреть на странице потока (см. Посмотреть список потоков).
    • <поток_данных> — название потока данных Data Streams.
    • <имя_БД> — название базы данных PostgreSQL.
    • <имя_пользователя> — имя пользователя для подключения к базе данных PostgreSQL.
    • <пароль_пользователя> — пароль пользователя для подключения к базе данных PostgreSQL.
  3. Запустите Debezium следующей командой:

    JAVA_OPTS=-Daws.cborEnabled=false ./run.sh
    
  4. Выполните какие-либо изменения в базе данных PostgreSQL, например, вставьте данные в таблицу.

  5. При правильной настройке в консоли Debezium появятся сообщения вида:

    2022-02-11 07:31:12,850 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 1 records sent during previous 00:19:59.999, last recorded offset: {transaction_id=null, lsn_proc=23576408, lsn_commit=23576120, lsn=23576408, txId=580, ts_usec=1644564672582666}
    

Подключите Query к потоку данных

  1. Создайте соединение с именем yds-connection и типом Data Streams.
  2. На странице создания привязки:
    • Введите имя привязки debezium.
    • Укажите поток данных cdebezium.
    • Добавьте колонку data с типом JSON.
  3. Нажмите кнопку Создать.

Выполните запрос к данным

В редакторе запросов в интерфейсе Query выполните следующий запрос:

$debezium_data = 
SELECT 
    JSON_VALUE(data,"$.payload.source.table") AS table_name, 
    DateTime::FromMilliseconds(cast(JSON_VALUE(data,"$.payload.source.ts_ms") AS Uint64)) AS `timestamp`
FROM bindings.`debezium`;

SELECT 
    table_name, 
    HOP_END() 
FROM 
    $debezium_data 
GROUP BY 
    HOP(`timestamp`, "PT10S", "PT10S", "PT10S"),
    table_name
LIMIT 2;

Примечание

Данные из потокового источника передаются в виде бесконечного потока. Чтобы остановить обработку и получить результат в консоли, данные в примере ограничены с помощь оператора LIMIT, который задает количество строк результата.

См. также

  • Чтение данных из Data Streams с помощью соединений в Query.

Была ли статья полезна?

Предыдущая
Обработка логов Cloud Logging
Следующая
Визуализация данных из Object Storage в DataLens
Проект Яндекса
© 2025 ООО «Яндекс.Облако»