Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Практические руководства
    • Все руководства
    • Развертывание веб-интерфейса Apache Kafka®
    • Миграция БД из стороннего кластера Apache Kafka® в Managed Service for Apache Kafka®
    • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for YDB в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for Greenplum® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for MongoDB с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for MySQL® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for OpenSearch с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for PostgreSQL с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Data Streams с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for Apache Kafka® с помощью Data Transfer
    • Захват изменений YDB и поставка в YDS
    • Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®
    • Автоматизация задач Query с помощью Managed Service for Apache Airflow™
    • Отправка запросов к API Yandex Cloud через Yandex Cloud Python SDK
    • Настройка SMTP-сервера для отправки уведомлений по электронной почте
    • Добавление данных в БД ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® средствами ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® при помощи Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for ClickHouse® с помощью Data Transfer
    • Асинхронная репликация данных из PostgreSQL в ClickHouse®
    • Обмен данными между Managed Service for ClickHouse® и Yandex Data Processing
    • Настройка Managed Service for ClickHouse® для Graphite
    • Получение данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®
    • Получение данных из Managed Service for Apache Kafka® в ksqlDB
    • Получение данных из RabbitMQ в Managed Service for ClickHouse®
    • Сохранение потока данных Data Streams в Managed Service for ClickHouse®
    • Асинхронная репликация данных из Яндекс Метрика в ClickHouse® с помощью Data Transfer
    • Использование гибридного хранилища в Managed Service for ClickHouse®
    • Шардирование таблиц Managed Service for ClickHouse®
    • Перешардирование данных в кластере Managed Service for ClickHouse®
    • Загрузка данных из Яндекс Директ в витрину Managed Service for ClickHouse® с использованием Cloud Functions, Object Storage и Data Transfer
    • Загрузка данных из Object Storage в Managed Service for ClickHouse® с помощью Data Transfer
    • Миграция данных со сменой хранилища из Managed Service for OpenSearch в Managed Service for ClickHouse® с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Managed Service for ClickHouse® с помощью Data Transfer
    • Миграция базы данных из Google BigQuery в Managed Service for ClickHouse®
    • Настройка Cloud DNS для доступа к кластеру Managed Service for ClickHouse® из других облачных сетей
    • Миграция кластера Yandex Data Processing с HDFS в другую зону доступности
    • Импорт данных из Managed Service for MySQL® в Yandex Data Processing с помощью Sqoop
    • Импорт данных из Managed Service for PostgreSQL в Yandex Data Processing с помощью Sqoop
    • Монтирование бакетов Object Storage к файловой системе хостов Yandex Data Processing
    • Работа с топиками Apache Kafka® с помощью Yandex Data Processing
    • Автоматизация работы с Yandex Data Processing с помощью Managed Service for Apache Airflow™
    • Совместная работа с таблицами Yandex Data Processing с использованием Metastore
    • Перенос метаданных между кластерами Yandex Data Processing с помощью Metastore
    • Импорт данных из Object Storage, обработка и экспорт в Managed Service for ClickHouse®
    • Миграция в Managed Service for Elasticsearch с помощью снапшотов
    • Миграция коллекций из стороннего кластера MongoDB в Managed Service for MongoDB
    • Миграция данных в Managed Service for MongoDB
    • Миграция кластера Managed Service for MongoDB с версии 4.4 на 6.0
    • Шардирование коллекций MongoDB
    • Анализ производительности и оптимизация MongoDB
    • Миграция БД из стороннего кластера MySQL® в кластер Managed Service for MySQL®
    • Анализ производительности и оптимизация Managed Service for MySQL®
    • Синхронизация данных из стороннего кластера MySQL® в Managed Service for MySQL® с помощью Data Transfer
    • Миграция БД из Managed Service for MySQL® в сторонний кластер MySQL®
    • Миграция БД из Managed Service for MySQL® в Object Storage с помощью Data Transfer
    • Перенос данных из Object Storage в Managed Service for MySQL® с использованием Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Миграция БД из Managed Service for MySQL® в Managed Service for YDB с помощью Data Transfer
    • Захват изменений MySQL® и поставка в YDS
    • Миграция данных из Managed Service for MySQL® в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из AWS RDS for PostgreSQL в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из Managed Service for MySQL® в Managed Service for Greenplum® с помощью Data Transfer
    • Настройка политики индексов в Managed Service for OpenSearch
    • Миграция данных из Elasticsearch в Managed Service for OpenSearch
    • Миграция данных в Managed Service for OpenSearch из стороннего кластера OpenSearch с помощью Data Transfer
    • Загрузка данных из Managed Service for OpenSearch в Object Storage с помощью Data Transfer
    • Миграция данных из Managed Service for OpenSearch в Managed Service for YDB с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Аутентификация в OpenSearch Dashboards кластера Managed Service for OpenSearch с помощью Keycloak
    • Использование плагина yandex-lemmer в Managed Service for OpenSearch
    • Создание кластера PostgreSQL для «1С:Предприятия»
    • Поиск проблем с производительностью кластера Managed Service for PostgreSQL
    • Анализ производительности и оптимизация Managed Service for PostgreSQL
    • Логическая репликация PostgreSQL
    • Миграция БД из стороннего кластера PostgreSQL в Managed Service for PostgreSQL
    • Миграция БД из Managed Service for PostgreSQL
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for YDB с помощью Data Transfer
    • Миграция БД из Managed Service for PostgreSQL в Object Storage
    • Перенос данных из Object Storage в Managed Service for PostgreSQL с использованием Data Transfer
    • Захват изменений PostgreSQL и поставка в YDS
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for MySQL® с помощью Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Решение проблем с сортировкой строк в PostgreSQL после обновления glibc
    • Миграция БД из Greenplum® в ClickHouse®
    • Миграция БД из Greenplum® в PostgreSQL
    • Выгрузка данных Greenplum® в холодное хранилище Object Storage
    • Загрузка данных из Object Storage в Managed Service for Greenplum® с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Создание внешней таблицы на базе таблицы из бакета Object Storage с помощью конфигурационного файла
    • Миграция БД из стороннего кластера Valkey™ в Yandex Managed Service for Valkey™
    • Использование кластера Yandex Managed Service for Valkey™ в качестве хранилища сессий PHP
    • Загрузка данных из Object Storage в Managed Service for YDB с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Object Storage с помощью Data Transfer
    • Обработка аудитных логов Audit Trails
    • Обработка логов Cloud Logging
    • Обработка потока изменений Debezium
    • Анализ данных с помощью Jupyter
    • Обработка файлов детализации в сервисе Yandex Cloud Billing
    • Ввод данных в системы хранения
    • Умная обработка логов
    • Передача данных в микросервисных архитектурах
    • Миграция данных в Object Storage с помощью Data Transfer
    • Миграция данных из стороннего кластера Greenplum® или PostgreSQL в Managed Service for Greenplum® с помощью Data Transfer
    • Миграция кластера Managed Service for MongoDB
    • Миграция кластера MySQL®
    • Миграция на сторонний кластер MySQL®
    • Миграция кластера PostgreSQL
    • Создание реестра схем для поставки данных в формате Debezium CDC из Apache Kafka®

В этой статье:

  • Перед началом работы
  • Создайте поток данных Data Streams
  • Настройте реквизиты подключения к потоку
  • Настройте Debezium Server
  • Подключите Query к потоку данных
  • Выполните запрос к данным
  • См. также
  1. Построение Data Platform
  2. Обработка потока изменений Debezium

Обработка потока изменений Debezium

Статья создана
Yandex Cloud
Обновлена 7 марта 2025 г.
  • Перед началом работы
  • Создайте поток данных Data Streams
  • Настройте реквизиты подключения к потоку
  • Настройте Debezium Server
  • Подключите Query к потоку данных
  • Выполните запрос к данным
  • См. также

Debezium — это сервис для захвата изменений в базах данных и отправки их на обработку в другие системы. С помощью Yandex Data Streams можно захватывать эти изменения, а с помощью Yandex Query выполнять их обработку. Обработанные данные можно:

  • отправить в Yandex Monitoring для построения графика и алертинга;
  • записать в поток Data Streams и далее отправить на обработку в Yandex Cloud Functions;
  • записать в поток Data Streams и далее передать в Yandex Data Transfer для отправки в различные системы хранения.

debezium-architecture

В этом сценарии вы отправите изменения базы данных PostgreSQL в поток Data Streams с помощью Debezium, а затем выполните к ним запрос с помощью Query. В результате выполнения запроса вы получите количество изменений в таблицах БД, сгруппированное по интервалам продолжительностью 10 секунд. Рассматривается установка Debezium на сервер, где уже установлена и запущена PostgreSQL.

Для выполнения сценария:

  1. Создайте поток данных Data Streams.
  2. Настройте реквизиты подключения к потоку.
  3. Настройте Debezium Server.
  4. Подключите Query к потоку данных.
  5. Выполните запрос к данным.

Перед началом работыПеред началом работы

Зарегистрируйтесь в Yandex Cloud и создайте платежный аккаунт:

  1. Перейдите в консоль управления, затем войдите в Yandex Cloud или зарегистрируйтесь.
  2. На странице Yandex Cloud Billing убедитесь, что у вас подключен платежный аккаунт, и он находится в статусе ACTIVE или TRIAL_ACTIVE. Если платежного аккаунта нет, создайте его и привяжите к нему облако.

Если у вас есть активный платежный аккаунт, вы можете создать или выбрать каталог, в котором будет работать ваша инфраструктура, на странице облака.

Подробнее об облаках и каталогах.

Создайте поток данных Data StreamsСоздайте поток данных Data Streams

Создайте поток данных c именем debezium.

Настройте реквизиты подключения к потокуНастройте реквизиты подключения к потоку

  1. Создайте сервисный аккаунт и назначьте ему роль editor на ваш каталог.
  2. Создайте статический ключ доступа.
  3. На сервере, где уже установлена и запущена PostgreSQL, настройте AWS CLI:
    1. Установите AWS CLI и выполните команду:

      aws configure
      
    2. Последовательно введите:

      • AWS Access Key ID [None]: — идентификатор ключа сервисного аккаунта.
      • AWS Secret Access Key [None]: — секретный ключ сервисного аккаунта.
      • Default region name [None]: — зону доступности ru-central1.

Настройте Debezium ServerНастройте Debezium Server

На сервере, где уже установлена и запущена PostgreSQL:

  1. Установите Debezium Server по инструкции.

  2. Перейдите в каталог conf и создайте файл application.properties со следующим содержимым:

    debezium.sink.type=kinesis
    debezium.sink.kinesis.region=ru-central1
    debezium.sink.kinesis.endpoint=<эндпоинт>
    debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
    debezium.source.offset.storage.file.filename=data/offsets.dat
    debezium.source.offset.flush.interval.ms=0
    debezium.source.database.hostname=localhost
    debezium.source.database.port=5432
    debezium.source.database.user=<имя_пользователя>
    debezium.source.database.password=<пароль_пользователя>
    debezium.source.database.dbname=<имя_БД>
    debezium.source.database.server.name=debezium
    debezium.source.plugin.name=pgoutput
    
    debezium.source.transforms=Reroute
    debezium.source.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
    debezium.source.transforms.Reroute.topic.regex=(.*)
    debezium.source.transforms.Reroute.topic.replacement=<поток_данных>
    

    Где:

    • <эндпоинт> — эндпоинт потока данных Data Streams, например, https://yds.serverless.yandexcloud.net/ru-central1/b1g89ae43m6he********/etn01eg4rn1********. Эндпоинт можно посмотреть на странице потока (см. Посмотреть список потоков).
    • <поток_данных> — название потока данных Data Streams.
    • <имя_БД> — название базы данных PostgreSQL.
    • <имя_пользователя> — имя пользователя для подключения к базе данных PostgreSQL.
    • <пароль_пользователя> — пароль пользователя для подключения к базе данных PostgreSQL.
  3. Запустите Debezium следующей командой:

    JAVA_OPTS=-Daws.cborEnabled=false ./run.sh
    
  4. Выполните какие-либо изменения в базе данных PostgreSQL, например, вставьте данные в таблицу.

  5. При правильной настройке в консоли Debezium появятся сообщения вида:

    2022-02-11 07:31:12,850 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 1 records sent during previous 00:19:59.999, last recorded offset: {transaction_id=null, lsn_proc=23576408, lsn_commit=23576120, lsn=23576408, txId=580, ts_usec=1644564672582666}
    

Подключите Query к потоку данныхПодключите Query к потоку данных

  1. Создайте соединение с именем yds-connection и типом Data Streams.
  2. На странице создания привязки:
    • Введите имя привязки debezium.
    • Укажите поток данных cdebezium.
    • Добавьте колонку data с типом JSON.
  3. Нажмите кнопку Создать.

Выполните запрос к даннымВыполните запрос к данным

В редакторе запросов в интерфейсе Query выполните следующий запрос:

$debezium_data = 
SELECT 
    JSON_VALUE(data,"$.payload.source.table") AS table_name, 
    DateTime::FromMilliseconds(cast(JSON_VALUE(data,"$.payload.source.ts_ms") AS Uint64)) AS `timestamp`
FROM bindings.`debezium`;

SELECT 
    table_name, 
    HOP_END() 
FROM 
    $debezium_data 
GROUP BY 
    HOP(`timestamp`, "PT10S", "PT10S", "PT10S"),
    table_name
LIMIT 2;

Примечание

Данные из потокового источника передаются в виде бесконечного потока. Чтобы остановить обработку и получить результат в консоли, данные в примере ограничены с помощь оператора LIMIT, который задает количество строк результата.

См. такжеСм. также

  • Чтение данных из Data Streams с помощью соединений в Query.

Была ли статья полезна?

Предыдущая
Обработка логов Cloud Logging
Следующая
Анализ данных с помощью Jupyter
Проект Яндекса
© 2025 ООО «Яндекс.Облако»