Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Managed Service for PostgreSQL
  • Начало работы
    • Все руководства
    • Создание кластера PostgreSQL для 1С
    • Создание кластера Linux-серверов «1С:Предприятия» с кластером Managed Service for PostgreSQL
    • Выгрузка базы данных в Yandex Data Processing
    • Поиск проблем с производительностью кластера
    • Анализ производительности и оптимизация
    • Настройка подключения из контейнера Serverless Containers
    • Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Поставка данных в Yandex Managed Service for YDB с помощью Yandex Data Transfer
    • Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Debezium
    • Захват изменений PostgreSQL и поставка в YDS
    • Поставка данных из Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Перенос данных из Yandex Object Storage с использованием Yandex Data Transfer
    • Настройка отказоустойчивой архитектуры в Yandex Cloud
    • Мониторинг состояния географически распределенных устройств
    • Запись логов балансировщика в PostgreSQL
    • Создание сервера MLFlow для логирования экспериментов и артефактов
    • Работа с данными с помощью Query
    • Федеративные запросы к данным с помощью Query
    • Решение проблем с сортировкой строк после обновления glibc
    • Запись данных с устройства в базу данных
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • История изменений
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовка кластера-источника
  • Настройте коннектор Debezium
  • Подготовьте кластер-приемник
  • Запустите коннектор Debezium
  • Проверьте работоспособность Debezium
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Debezium

Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Debezium

Статья создана
Yandex Cloud
Обновлена 18 марта 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовка кластера-источника
  • Настройте коннектор Debezium
  • Подготовьте кластер-приемник
  • Запустите коннектор Debezium
  • Проверьте работоспособность Debezium
  • Удалите созданные ресурсы

Вы можете отслеживать изменения данных в Managed Service for PostgreSQL и отправлять их в Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).

Из этой статьи вы узнаете, как создать в Yandex Cloud виртуальную машину и настроить на ней Debezium — программное обеспечение, используемое для CDC.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for PostgreSQL: использование вычислительных ресурсов, выделенных хостам, и дискового пространства (см. тарифы Managed Service for PostgreSQL).
  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за ВМ: использование вычислительных ресурсов, операционной системы и хранилища (см. тарифы Compute Cloud).
  • Плата за использование публичных IP-адресов для ВМ и хостов двух кластеров (см. тарифы Virtual Private Cloud).

Перед началом работыПеред началом работы

  1. Создайте кластер-источник со следующими настройками:

    • с хостами в публичном доступе;
    • с базой данных db1;
    • с пользователем user1.
  2. Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.

  3. Создайте виртуальную машину с Ubuntu 20.04 и публичным IP-адресом.

  4. Если вы используете группы безопасности, настройте их так, чтобы к кластерам можно было подключаться из интернета и созданной виртуальной машины, а к ней — из интернета по SSH:

    • Настройка групп безопасности кластера Managed Service for Apache Kafka®.
    • Настройка групп безопасности кластера Managed Service for PostgreSQL.
  5. Подключитесь к виртуальной машине по SSH и выполните ее предварительную настройку:

    1. Установите зависимости:

      sudo apt update && \
          sudo apt install kafkacat openjdk-17-jre postgresql-client --yes
      

      Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.

    2. Создайте директорию для Apache Kafka®:

      sudo mkdir -p /opt/kafka/
      
    3. Скачайте и распакуйте в эту директорию архив с исполняемыми файлами Apache Kafka®. Например, для загрузки и распаковки Apache Kafka® версии 3.0 выполните команду:

      wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz && \
      sudo tar xf kafka_2.13-3.0.0.tgz --strip 1 --directory /opt/kafka/
      

      Актуальную версию Apache Kafka® уточняйте на странице загрузок проекта.

    4. Установите на виртуальную машину сертификаты и убедитесь в доступности кластеров:

      • Managed Service for Apache Kafka® (используйте утилиту kafkacat).
      • Managed Service for PostgreSQL (используйте утилиту psql).
    5. Создайте директорию, в которой будут храниться файлы, необходимые для работы коннектора Debezium:

      sudo mkdir -p /etc/debezium/plugins/
      
    6. Чтобы коннектор Debezium мог подключаться к хостам-брокерам Managed Service for Apache Kafka®, добавьте SSL-сертификат в защищенное хранилище сертификатов Java (Java Key Store). Для дополнительной защиты хранилища в параметре -storepass укажите пароль длиной не меньше 6 символов:

      sudo keytool \
          -importcert \
          -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
          -keystore /etc/debezium/keystore.jks \
          -storepass <пароль_JKS> \
          --noprompt
      

Подготовка кластера-источникаПодготовка кластера-источника

  1. Назначьте пользователю user1 роль mdb_replication.

    Это нужно для создания публикации (publication), с помощью которой Debezium будет отслеживать изменения в кластере Managed Service for PostgreSQL.

  2. Подключитесь к базе данных db1 от имени пользователя user1.

  3. Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию с некоторых датчиков автомобиля.

    Создайте таблицу:

    CREATE TABLE public.measurements (
        "device_id" text PRIMARY KEY NOT NULL,
        "datetime" timestamp NOT NULL,
        "latitude" real NOT NULL,
        "longitude" real NOT NULL,
        "altitude" real NOT NULL,
        "speed" real NOT NULL,
        "battery_voltage" real,
        "cabin_temperature" real NOT NULL,
        "fuel_level" real
    );
    

    Наполните таблицу данными:

    INSERT INTO public.measurements VALUES
        ('iv9a94th6rzt********', '2020-06-05 17:27:00', 55.70329032, 37.65472196,  427.5,    0, 23.5, 17, NULL),
        ('rhibbh3y08qm********', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32),
        ('iv9a94th678t********', '2020-06-07 15:00:10', 55.70985913, 37.62141918,  417.0, 15.7, 10.3, 17, NULL);
    
  4. Создайте публикацию для добавленной таблицы:

    CREATE PUBLICATION mpg_publication FOR TABLE public.measurements;
    

Настройте коннектор DebeziumНастройте коннектор Debezium

  1. Подключитесь к виртуальной машине по SSH.

  2. Скачайте и распакуйте актуальный Debezium-коннектор в директорию /etc/debezium/plugins/.

    Актуальную версию коннектора уточняйте на странице проекта. Ниже приведены команды для версии 1.9.4.Final.

    VERSION="1.9.4.Final"
    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/${VERSION}/debezium-connector-postgres-${VERSION}-plugin.tar.gz && \
    sudo tar -xzvf debezium-connector-postgres-${VERSION}-plugin.tar.gz -C /etc/debezium/plugins/
    
  3. Создайте файл /etc/debezium/mdb-connector.conf с настройками коннектора Debezium для подключения к кластеру-источнику:

    name=debezium-mpg
    connector.class=io.debezium.connector.postgresql.PostgresConnector
    plugin.name=pgoutput
    database.hostname=c-<идентификатор_кластера>.rw.mdb.yandexcloud.net
    database.port=6432
    database.user=user1
    database.password=<пароль_пользователя_user1>
    database.dbname=db1
    database.server.name=mpg
    table.include.list=public.measurements
    publication.name=mpg_publication
    slot.name=debezium_slot
    heartbeat.interval.ms=15000
    heartbeat.topics.prefix=debezium-heartbeat
    snapshot.mode=always
    

    Где:

    • name — логическое имя коннектора Debezium. Используется для внутренних нужд коннектора.

    • database.hostname — особый FQDN для подключения к хосту-мастеру кластера-источника.

      Идентификатор кластера можно получить со списком кластеров в каталоге.

    • database.user — имя пользователя PostgreSQL.

    • database.dbname — имя базы данных PostgreSQL.

    • database.server.name — имя сервера баз данных, которое Debezium будет использовать при выборе топика для отправки сообщений.

    • table.include.list — имена таблиц, для которых Debezium должен отслеживать изменения. Укажите полные имена, включающие в себя имя схемы (по умолчанию public). Debezium будет использовать значения настроек из этого поля при выборе топика для отправки сообщений.

    • publication.name — имя публикации, созданной на кластере-источнике.

    • slot.name — имя слота репликации, который будет создан Debezium при работе с публикацией.

    • heartbeat.interval.ms и heartbeat.topics.prefix — настройки heartbeat, необходимые для работы Debezium.

    • snapshot.mode — тип создания снапшота при запуске коннектора. Для корректной работы коннектора рекомендуется использовать значение параметра always.

Подготовьте кластер-приемникПодготовьте кластер-приемник

  1. Создайте топик, в который будут помещаться данные, поступающие от кластера-источника:

    • Имя — mpg.public.measurements.

      Имена топиков для данных конструируются по принципу <имя_сервера>.<имя_схемы>.<имя_таблицы>.

      Согласно файлу настроек коннектора Debezium:

      • Имя сервера mpg указано в параметре database.server.name.
      • Имя схемы public указано вместе с именем таблицы measurements в параметре table.include.list.

    Если необходимо отслеживать изменения в нескольких таблицах, создайте для каждой из них отдельный топик.

  2. Создайте служебный топик для отслеживания состояния коннектора:

    • Имя — debezium-heartbeat.mpg.

      Имена служебных топиков конструируются по принципу <префикс_для_heartbeat>.<имя_сервера>.

      Согласно файлу настроек коннектора Debezium:

      • Префикс debezium-heartbeat указан в параметре heartbeat.topics.prefix.
      • Имя сервера mpg указано в параметре database.server.name.
    • Политика очистки лога — Compact.

    Если необходимо получать данные из нескольких кластеров-источников, создайте для каждого из них отдельный служебный топик.

  3. Создайте пользователя с именем debezium.

  4. Выдайте пользователю debezium права ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER на созданные топики.

Запустите коннектор DebeziumЗапустите коннектор Debezium

  1. Создайте файл с настройками воркера Debezium:

    /etc/debezium/worker.conf

    # AdminAPI connect properties
    bootstrap.servers=<FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091
    sasl.mechanism=SCRAM-SHA-512
    security.protocol=SASL_SSL
    ssl.truststore.location=/etc/debezium/keystore.jks
    ssl.truststore.password=<пароль_JKS>
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль_пользователя_debezium>";
    
    # Producer connect properties
    producer.sasl.mechanism=SCRAM-SHA-512
    producer.security.protocol=SASL_SSL
    producer.ssl.truststore.location=/etc/debezium/keystore.jks
    producer.ssl.truststore.password=<пароль_JKS>
    producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль_пользователя_debezium>";
    
    # Worker properties
    plugin.path=/etc/debezium/plugins/
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.file.filename=/etc/debezium/worker.offset
    
  2. В отдельном терминале запустите коннектор:

    sudo /opt/kafka/bin/connect-standalone.sh \
        /etc/debezium/worker.conf \
        /etc/debezium/mdb-connector.conf
    

Проверьте работоспособность DebeziumПроверьте работоспособность Debezium

  1. В отдельном терминале запустите утилиту kafkacat в режиме потребителя:

    kafkacat \
        -C \
        -b <FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 \
        -t mpg.public.measurements \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=debezium \
        -X sasl.password=<пароль> \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
        -Z \
        -K:
    

    Будет выведена схема формата данных таблицы db1.public.measurements и данные о добавленных в нее ранее строках.

    Пример фрагмента сообщения
    {
    "schema": {
        ...
    },
    "payload": {
        "before": null,
        "after": {
            "device_id": "iv9a94th6rzt********",
            "datetime": 1591378020000000,
            "latitude": 55.70329,
            "longitude": 37.65472,
            "altitude": 427.5,
            "speed": 0.0,
            "battery_voltage": 23.5,
            "cabin_temperature": 17.0,
            "fuel_level": null
        },
        "source": {
            "version": "1.8.1.Final",
            "connector": "postgresql",
            "name": "mpg",
            "ts_ms": 1628245046882,
            "snapshot": "true",
            "db": "db1",
            "sequence": "[null,\"4328525512\"]",
            "schema": "public",
            "table": "measurements",
            "txId": 8861,
            "lsn": 4328525328,
            "xmin": null
        },
        "op": "r",
        "ts_ms": 1628245046893,
        "transaction": null
      }
    }
    
  2. Подключитесь к кластеру-источнику.

    При подключении может возникнуть ошибка ERROR Postgres roles LOGIN and REPLICATION are not assigned to user. Она не влияет на работу Debezium, и ее можно игнорировать.

  3. Добавьте еще одну строку в таблицу measurements:

    INSERT INTO public.measurements VALUES ('iv7b74th678t********', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
    
  4. Убедитесь, что в терминале с запущенной утилитой kafkacat отобразились сведения о добавленной строке.

Удалите созданные ресурсыУдалите созданные ресурсы

Удалите ресурсы, которые вы больше не будете использовать, чтобы за них не списывалась плата:

  1. Удалите виртуальную машину.

    Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, освободите и удалите его.

  2. Удалите кластеры:

    • Managed Service for Apache Kafka®.
    • Managed Service for PostgreSQL.

Была ли статья полезна?

Предыдущая
Поставка данных в Yandex Managed Service for YDB с помощью Yandex Data Transfer
Следующая
Захват изменений PostgreSQL и поставка в YDS
Проект Яндекса
© 2025 ООО «Яндекс.Облако»