Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Практические руководства
    • Все руководства
    • Развертывание веб-интерфейса Apache Kafka®
    • Миграция БД из стороннего кластера Apache Kafka® в Managed Service for Apache Kafka®
    • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for YDB в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for Greenplum® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for MongoDB с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for MySQL® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for OpenSearch с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for PostgreSQL с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Data Streams с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for Apache Kafka® с помощью Data Transfer
    • Захват изменений YDB и поставка в YDS
    • Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®
    • Автоматизация задач Query с помощью Managed Service for Apache Airflow™
    • Отправка запросов к API Yandex Cloud через Yandex Cloud Python SDK
    • Настройка SMTP-сервера для отправки уведомлений по электронной почте
    • Добавление данных в БД ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® средствами ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® при помощи Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for ClickHouse® с помощью Data Transfer
    • Асинхронная репликация данных из PostgreSQL в ClickHouse®
    • Обмен данными между Managed Service for ClickHouse® и Yandex Data Processing
    • Настройка Managed Service for ClickHouse® для Graphite
    • Получение данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®
    • Получение данных из Managed Service for Apache Kafka® в ksqlDB
    • Получение данных из RabbitMQ в Managed Service for ClickHouse®
    • Сохранение потока данных Data Streams в Managed Service for ClickHouse®
    • Асинхронная репликация данных из Яндекс Метрика в ClickHouse® с помощью Data Transfer
    • Использование гибридного хранилища в Managed Service for ClickHouse®
    • Шардирование таблиц Managed Service for ClickHouse®
    • Перешардирование данных в кластере Managed Service for ClickHouse®
    • Загрузка данных из Яндекс Директ в витрину Managed Service for ClickHouse® с использованием Cloud Functions, Object Storage и Data Transfer
    • Загрузка данных из Object Storage в Managed Service for ClickHouse® с помощью Data Transfer
    • Миграция данных со сменой хранилища из Managed Service for OpenSearch в Managed Service for ClickHouse® с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Managed Service for ClickHouse® с помощью Data Transfer
    • Миграция базы данных из Google BigQuery в Managed Service for ClickHouse®
    • Настройка Cloud DNS для доступа к кластеру Managed Service for ClickHouse® из других облачных сетей
    • Миграция кластера Yandex Data Processing с HDFS в другую зону доступности
    • Импорт данных из Managed Service for MySQL® в Yandex Data Processing с помощью Sqoop
    • Импорт данных из Managed Service for PostgreSQL в Yandex Data Processing с помощью Sqoop
    • Монтирование бакетов Object Storage к файловой системе хостов Yandex Data Processing
    • Работа с топиками Apache Kafka® с помощью Yandex Data Processing
    • Автоматизация работы с Yandex Data Processing с помощью Managed Service for Apache Airflow™
    • Совместная работа с таблицами Yandex Data Processing с использованием Metastore
    • Перенос метаданных между кластерами Yandex Data Processing с помощью Metastore
    • Импорт данных из Object Storage, обработка и экспорт в Managed Service for ClickHouse®
    • Миграция в Managed Service for Elasticsearch с помощью снапшотов
    • Миграция коллекций из стороннего кластера MongoDB в Managed Service for MongoDB
    • Миграция данных в Managed Service for MongoDB
    • Миграция кластера Managed Service for MongoDB с версии 4.4 на 6.0
    • Шардирование коллекций MongoDB
    • Анализ производительности и оптимизация MongoDB
    • Миграция БД из стороннего кластера MySQL® в кластер Managed Service for MySQL®
    • Анализ производительности и оптимизация Managed Service for MySQL®
    • Синхронизация данных из стороннего кластера MySQL® в Managed Service for MySQL® с помощью Data Transfer
    • Миграция БД из Managed Service for MySQL® в сторонний кластер MySQL®
    • Миграция БД из Managed Service for MySQL® в Object Storage с помощью Data Transfer
    • Перенос данных из Object Storage в Managed Service for MySQL® с использованием Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Миграция БД из Managed Service for MySQL® в Managed Service for YDB с помощью Data Transfer
    • Захват изменений MySQL® и поставка в YDS
    • Миграция данных из Managed Service for MySQL® в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из AWS RDS for PostgreSQL в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из Managed Service for MySQL® в Managed Service for Greenplum® с помощью Data Transfer
    • Настройка политики индексов в Managed Service for OpenSearch
    • Миграция данных из Elasticsearch в Managed Service for OpenSearch
    • Миграция данных в Managed Service for OpenSearch из стороннего кластера OpenSearch с помощью Data Transfer
    • Загрузка данных из Managed Service for OpenSearch в Object Storage с помощью Data Transfer
    • Миграция данных из Managed Service for OpenSearch в Managed Service for YDB с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Аутентификация в OpenSearch Dashboards кластера Managed Service for OpenSearch с помощью Keycloak
    • Использование плагина yandex-lemmer в Managed Service for OpenSearch
    • Создание кластера PostgreSQL для «1С:Предприятия»
    • Поиск проблем с производительностью кластера Managed Service for PostgreSQL
    • Анализ производительности и оптимизация Managed Service for PostgreSQL
    • Логическая репликация PostgreSQL
    • Миграция БД из стороннего кластера PostgreSQL в Managed Service for PostgreSQL
    • Миграция БД из Managed Service for PostgreSQL
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for YDB с помощью Data Transfer
    • Миграция БД из Managed Service for PostgreSQL в Object Storage
    • Перенос данных из Object Storage в Managed Service for PostgreSQL с использованием Data Transfer
    • Захват изменений PostgreSQL и поставка в YDS
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for MySQL® с помощью Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Решение проблем с сортировкой строк в PostgreSQL после обновления glibc
    • Миграция БД из Greenplum® в ClickHouse®
    • Миграция БД из Greenplum® в PostgreSQL
    • Выгрузка данных Greenplum® в холодное хранилище Object Storage
    • Загрузка данных из Object Storage в Managed Service for Greenplum® с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Создание внешней таблицы на базе таблицы из бакета Object Storage с помощью конфигурационного файла
    • Миграция БД из стороннего кластера Valkey™ в Yandex Managed Service for Valkey™
    • Использование кластера Yandex Managed Service for Valkey™ в качестве хранилища сессий PHP
    • Загрузка данных из Object Storage в Managed Service for YDB с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Object Storage с помощью Data Transfer
    • Обработка аудитных логов Audit Trails
    • Обработка логов Cloud Logging
    • Обработка потока изменений Debezium
    • Анализ данных с помощью Jupyter
    • Обработка файлов детализации в сервисе Yandex Cloud Billing
    • Ввод данных в системы хранения
    • Умная обработка логов
    • Передача данных в микросервисных архитектурах
    • Миграция данных в Object Storage с помощью Data Transfer
    • Миграция данных из стороннего кластера Greenplum® или PostgreSQL в Managed Service for Greenplum® с помощью Data Transfer
    • Миграция кластера Managed Service for MongoDB
    • Миграция кластера MySQL®
    • Миграция на сторонний кластер MySQL®
    • Миграция кластера PostgreSQL
    • Создание реестра схем для поставки данных в формате Debezium CDC из Apache Kafka®

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовка кластера-источника
  • Настройте коннектор Debezium
  • Подготовьте кластер-приемник
  • Запустите коннектор Debezium
  • Проверьте работоспособность Debezium
  • Удалите созданные ресурсы
  1. Построение Data Platform
  2. Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium

Поставка данных из Yandex Managed Service for MySQL® в Yandex Managed Service for Apache Kafka® с помощью Debezium

Статья создана
Yandex Cloud
Обновлена 8 апреля 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовка кластера-источника
  • Настройте коннектор Debezium
  • Подготовьте кластер-приемник
  • Запустите коннектор Debezium
  • Проверьте работоспособность Debezium
  • Удалите созданные ресурсы

Вы можете отслеживать изменения данных в Managed Service for MySQL® и отправлять их в Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).

Из этой статьи вы узнаете, как создать в Yandex Cloud виртуальную машину и настроить на ней Debezium — программное обеспечение, используемое для CDC.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за кластер Managed Service for MySQL®: использование вычислительных ресурсов, выделенных хостам, и дискового пространства (см. тарифы MySQL®).
  • Плата за использование публичных IP-адресов для хостов кластеров (см. тарифы Virtual Private Cloud).
  • Плата за ВМ: использование вычислительных ресурсов, хранилища и публичного IP-адреса (см. тарифы Compute Cloud).

Перед началом работыПеред началом работы

  1. Создайте кластер-источник со следующими настройками:

    • с хостами в публичном доступе;
    • с базой данных db1;
    • с пользователем user1.
  2. Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.

  3. Создайте виртуальную машину с Ubuntu 20.04 и публичным IP-адресом.

  4. Если вы используете группы безопасности, настройте их так, чтобы к кластерам можно было подключаться из интернета и созданной виртуальной машины, а к ней — из интернета по SSH:

    • Настройка групп безопасности кластера Managed Service for Apache Kafka®.
    • Настройка групп безопасности кластера Managed Service for MySQL®.
  5. Подключитесь к виртуальной машине по SSH и выполните ее предварительную настройку:

    1. Установите зависимости:

      sudo apt update && \
          sudo apt install kafkacat openjdk-17-jre mysql-client --yes
      

      Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.

    2. Создайте директорию для Apache Kafka®:

      sudo mkdir -p /opt/kafka/
      
    3. Скачайте и распакуйте в эту директорию архив с исполняемыми файлами Apache Kafka®. Например, для загрузки и распаковки Apache Kafka® версии 3.0 выполните команду:

      wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz && \
      sudo tar xf kafka_2.13-3.0.0.tgz --strip 1 --directory /opt/kafka/
      

      Актуальную версию Apache Kafka® уточняйте на странице загрузок проекта.

    4. Установите на виртуальную машину сертификаты и убедитесь в доступности кластеров:

      • Managed Service for Apache Kafka® (используйте утилиту kafkacat).
      • Managed Service for MySQL® (используйте утилиту mysql).
    5. Создайте директорию, в которой будут храниться файлы, необходимые для работы коннектора Debezium:

      sudo mkdir -p /etc/debezium/plugins/
      
    6. Чтобы коннектор Debezium мог подключаться к хостам-брокерам Managed Service for Apache Kafka®, добавьте SSL-сертификат в защищенное хранилище сертификатов Java (Java Key Store). Для дополнительной защиты хранилища в параметре -storepass укажите пароль длиной не меньше 6 символов:

      sudo keytool \
          -importcert \
          -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
          -keystore /etc/debezium/keystore.jks \
          -storepass <пароль_JKS> \
          --noprompt
      

Подготовка кластера-источникаПодготовка кластера-источника

  1. Назначьте пользователю user1 глобальные привилегии REPLICATION CLIENT и REPLICATION SLAVE.

  2. Подключитесь к базе данных db1 от имени пользователя user1.

  3. Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию с некоторых датчиков автомобиля.

    1. Создайте таблицу:

      CREATE TABLE measurements (
        `device_id` VARCHAR(32) PRIMARY KEY NOT NULL,
        `datetime` TIMESTAMP NOT NULL,
        `latitude` REAL NOT NULL,
        `longitude` REAL NOT NULL,
        `altitude` REAL NOT NULL,
        `speed` REAL NOT NULL,
        `battery_voltage` REAL,
        `cabin_temperature` REAL NOT NULL,
        `fuel_level` REAL
      );
      
    2. Наполните таблицу данными:

      INSERT INTO measurements VALUES
        ('iv9a94th6rzt********', '2020-06-05 17:27:00', 55.70329032, 37.65472196,  427.5,    0, 23.5, 17, NULL),
        ('rhibbh3y08qm********', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32),
        ('iv9a94th678t********', '2020-06-07 15:00:10', 55.70985913, 37.62141918,  417.0, 15.7, 10.3, 17, NULL);
      

Настройте коннектор DebeziumНастройте коннектор Debezium

  1. Подключитесь к виртуальной машине по SSH.

  2. Скачайте и распакуйте актуальный Debezium-коннектор в директорию /etc/debezium/plugins/.

    Актуальную версию коннектора уточняйте на странице проекта. Ниже приведены команды для версии 1.9.4.Final.

    VERSION="1.9.4.Final"
    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/${VERSION}/debezium-connector-mysql-${VERSION}-plugin.tar.gz && \
    sudo tar -xzvf debezium-connector-mysql-${VERSION}-plugin.tar.gz -C /etc/debezium/plugins/
    
  3. Создайте файл /etc/debezium/mdb-connector.conf с настройками коннектора Debezium для подключения к кластеру-источнику:

    name=debezium-mmy
    connector.class=io.debezium.connector.mysql.MySqlConnector
    database.hostname=c-<идентификатор_кластера>.rw.mdb.yandexcloud.net
    database.port=3306
    database.user=user1
    database.password=<пароль_пользователя_user1>
    database.dbname=db1
    database.server.name=mmy
    database.ssl.mode=required_identity
    table.include.list=db1.measurements
    heartbeat.interval.ms=15000
    heartbeat.topics.prefix=__debezium-heartbeat
    
    snapshot.mode=never
    include.schema.changes=false
    database.history.kafka.topic=dbhistory.mmy
    database.history.kafka.bootstrap.servers=<FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091
    
    # Producer settings
    database.history.producer.security.protocol=SSL
    database.history.producer.ssl.truststore.location=/etc/debezium/keystore.jks
    database.history.producer.ssl.truststore.password=<пароль_JKS>
    database.history.producer.sasl.mechanism=SCRAM-SHA-512
    database.history.producer.security.protocol=SASL_SSL
    database.history.producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль_пользователя_debezium>";
    
    # Consumer settings
    database.history.consumer.security.protocol=SSL
    database.history.consumer.ssl.truststore.location=/etc/debezium/keystore.jks
    database.history.consumer.ssl.truststore.password=<пароль_JKS>
    database.history.consumer.sasl.mechanism=SCRAM-SHA-512
    database.history.consumer.security.protocol=SASL_SSL
    database.history.consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль_пользователя_debezium>";
    

    Где:

    • name — логическое имя коннектора Debezium. Используется для внутренних нужд коннектора.

    • database.hostname — особый FQDN для подключения к хосту-мастеру кластера-источника.

      Идентификатор кластера можно получить со списком кластеров в каталоге.

    • database.user — имя пользователя MySQL®.

    • database.dbname — имя базы данных MySQL®.

    • database.server.name — имя сервера баз данных, которое Debezium будет использовать при выборе топика для отправки сообщений.

    • table.include.list — имена таблиц, для которых Debezium должен отслеживать изменения. Укажите полные имена, включающие в себя имя базы данных (db1). Debezium будет использовать значения настроек из этого поля при выборе топика для отправки сообщений.

    • heartbeat.interval.ms и heartbeat.topics.prefix — настройки heartbeat, необходимые для работы Debezium.

    • database.history.kafka.topic — имя служебного топика, используемого коннектором для отправки уведомлений об изменениях в схеме данных кластера-источника.

Подготовьте кластер-приемникПодготовьте кластер-приемник

  1. Создайте топик, в который будут помещаться данные, поступающие от кластера-источника:

    • Имя — mmy.db1.measurements.

      Имена топиков для данных конструируются по принципу <имя_сервера>.<имя_БД>.<имя_таблицы>.

      Согласно файлу настроек коннектора Debezium:

      • Имя сервера mmy указано в параметре database.server.name.
      • Имя базы данных db1 указано вместе с именем таблицы measurements в параметре table.include.list.

    Если необходимо отслеживать изменения в нескольких таблицах, создайте для каждой из них отдельный топик.

  2. Создайте служебный топик для отслеживания состояния коннектора:

    • Имя — __debezium-heartbeat.mmy.

      Имена служебных топиков конструируются по принципу <префикс_для_heartbeat>.<имя_сервера>.

      Согласно файлу настроек коннектора Debezium:

      • Префикс __debezium-heartbeat указан в параметре heartbeat.topics.prefix.
      • Имя сервера mmy указано в параметре database.server.name.
    • Политика очистки лога — Compact.

    Если необходимо получать данные из нескольких кластеров-источников, создайте для каждого из них отдельный служебный топик.

  3. Создайте служебный топик для отслеживания изменений в схеме формата данных:

    • Имя — dbhistory.mmy.
    • Политика очистки лога — Delete.
    • Количество разделов — 1.
  4. Создайте пользователя с именем debezium.

  5. Выдайте пользователю debezium права ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER на созданные топики.

Запустите коннектор DebeziumЗапустите коннектор Debezium

  1. Создайте файл с настройками воркера Debezium:

    /etc/debezium/worker.conf

    # AdminAPI connect properties
    bootstrap.servers=<FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091
    sasl.mechanism=SCRAM-SHA-512
    security.protocol=SASL_SSL
    ssl.truststore.location=/etc/debezium/keystore.jks
    ssl.truststore.password=<пароль_JKS>
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль_пользователя_debezium>";
    
    # Producer connect properties
    producer.sasl.mechanism=SCRAM-SHA-512
    producer.security.protocol=SASL_SSL
    producer.ssl.truststore.location=/etc/debezium/keystore.jks
    producer.ssl.truststore.password=<пароль_JKS>
    producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<пароль_пользователя_debezium>";
    
    # Worker properties
    plugin.path=/etc/debezium/plugins/
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.file.filename=/etc/debezium/worker.offset
    
  2. В отдельном терминале запустите коннектор:

    sudo /opt/kafka/bin/connect-standalone.sh \
        /etc/debezium/worker.conf \
        /etc/debezium/mdb-connector.properties
    

Проверьте работоспособность DebeziumПроверьте работоспособность Debezium

  1. В отдельном терминале запустите утилиту kafkacat в режиме потребителя:

    kafkacat \
        -C \
        -b <FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 \
        -t mmy.db1.measurements \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=debezium \
        -X sasl.password=<пароль> \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
        -Z \
        -K:
    

    Будет выведена схема формата данных таблицы db1.measurements и данные о добавленных в нее ранее строках.

    Пример фрагмента сообщения
    {
    "schema": {
        ...
    },
    "payload": {
        "before": null,
        "after": {
            "device_id": "iv9a94th6rzt********",
            "datetime": 1591378020000000,
            "latitude": 55.70329,
            "longitude": 37.65472,
            "altitude": 427.5,
            "speed": 0.0,
            "battery_voltage": 23.5,
            "cabin_temperature": 17.0,
            "fuel_level": null
        },
        "source": {
            "version": "1.8.1.Final",
            "connector": "mysql",
            "name": "mmy",
            "ts_ms": 1628245046882,
            "snapshot": "true",
            "db": "db1",
            "sequence": "[null,\"4328525512\"]",
            "table": "measurements",
            "txId": 8861,
            "lsn": 4328525328,
            "xmin": null
        },
        "op": "r",
        "ts_ms": 1628245046893,
        "transaction": null
      }
    }
    
  2. Подключитесь к кластеру-источнику и добавьте еще одну строку в таблицу measurements:

    INSERT INTO measurements VALUES ('iv7b74th678t********', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
    
  3. Убедитесь, что в терминале с запущенной утилитой kafkacat отобразились сведения о добавленной строке.

Удалите созданные ресурсыУдалите созданные ресурсы

Удалите ресурсы, которые вы больше не будете использовать, чтобы за них не списывалась плата:

  1. Удалите виртуальную машину.

    Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, освободите и удалите его.

  2. Удалите кластеры:

    • Managed Service for Apache Kafka®.
    • Managed Service for MySQL®.

Была ли статья полезна?

Предыдущая
Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
Следующая
Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
Проект Яндекса
© 2025 ООО «Яндекс.Облако»