Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Data Transfer
  • Доступные трансферы
  • Начало работы
    • Все руководства
      • MySQL® в Apache Kafka®
      • MySQL® в YDS
      • PostgreSQL в Apache Kafka®
      • PostgreSQL в YDS
      • YDB в Apache Kafka®
      • YDB и поставка в YDS
  • Решение проблем
  • Управление доступом
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте кластер-источник
  • Подготовьте кластер-приемник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Особенности поставки данных с помощью Data Transfer
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Захват изменений данных и поставка в очередь
  3. MySQL® в Apache Kafka®

Захват изменений из MySQL® и поставка в Apache Kafka®

Статья создана
Yandex Cloud
Обновлена 21 апреля 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте кластер-источник
  • Подготовьте кластер-приемник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
    • Особенности поставки данных с помощью Data Transfer
  • Удалите созданные ресурсы

Вы можете отслеживать изменения данных в кластере-источнике Managed Service for MySQL® и отправлять их в кластер-приемник Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).

Чтобы настроить CDC с использованием сервиса Data Transfer:

  1. Подготовьте кластер-источник.
  2. Подготовьте кластер-приемник.
  3. Подготовьте и активируйте трансфер.
  4. Проверьте работоспособность трансфера.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for MySQL®: использование вычислительных ресурсов, выделенных хостам, и дискового пространства (см. тарифы Managed Service for MySQL®).
  • Плата за кластер Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за использование публичных IP-адресов для хостов кластеров (см. тарифы Virtual Private Cloud).
  • Плата за трансфер: использование вычислительных ресурсов и количество переданных строк данных (см. тарифы Data Transfer).

Перед началом работыПеред началом работы

  1. Создайте кластер-источник Managed Service for MySQL® любой подходящей конфигурации со следующими настройками:

    • с базой данных db1;
    • с пользователем my-user;
    • с хостами в публичном доступе.
  2. Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.

  3. Если вы используете группы безопасности, настройте их так, чтобы к кластерам можно было подключаться из интернета:

    • Инструкция для Managed Service for MySQL®.
    • Инструкция для Managed Service for Apache Kafka®.
  4. Установите на локальный компьютер утилиту kcat (kafkacat) и утилиту командной строки MySQL. Например, в Ubuntu 20.04 выполните команду:

    sudo apt update && sudo apt install kafkacat mysql-client --yes
    

    Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.

Подготовьте кластер-источникПодготовьте кластер-источник

  1. Чтобы сервис Data Transfer мог получать от кластера Managed Service for MySQL® уведомления об изменениях в данных, в кластере-источнике необходимо настроить внешнюю репликацию. Чтобы пользователь my-user мог выполнять репликацию, назначьте ему роль ALL_PRIVILEGES для базы данных db1 и выдайте глобальные привилегии REPLICATION CLIENT и REPLICATION SLAVE.

  2. Подключитесь к базе данных db1 от имени пользователя my-user.

  3. Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию, поступающую от некоторых датчиков автомобиля.

    Создайте таблицу:

    CREATE TABLE db1.measurements (
        device_id varchar(200) NOT NULL,
        datetime timestamp NOT NULL,
        latitude real NOT NULL,
        longitude real NOT NULL,
        altitude real NOT NULL,
        speed real NOT NULL,
        battery_voltage real,
        cabin_temperature real NOT NULL,
        fuel_level real,
        PRIMARY KEY (device_id)
    );
    

    Наполните таблицу данными:

    INSERT INTO db1.measurements VALUES
        ('iv9a94th6rzt********', '2022-06-05 17:27:00', 55.70329032, 37.65472196,  427.5,    0, 23.5, 17, NULL),
        ('rhibbh3y08qm********', '2022-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32);
    

Подготовьте кластер-приемникПодготовьте кластер-приемник

Настройки различаются в зависимости от используемого способа управления топиками. При этом имена топиков для данных формируются по следующему принципу — <префикс_топика>.<имя_схемы>.<имя_таблицы>. В этом руководстве в качестве примера будет использоваться префикс cdc.

Интерфейсы Yandex Cloud
Admin API

Если управление топиками осуществляется с помощью стандартных интерфейсов Yandex Cloud (Консоль управления, CLI, Terraform, API):

  1. Создайте топик с именем cdc.db1.measurements.

    Для отслеживания изменений в нескольких таблицах создайте для каждой из них отдельный топик.

  2. Создайте пользователя с именем kafka-user и ролями ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER, действующими для созданных топиков. Чтобы включить все такие топики, укажите в имени топика cdc.*.

Если для управления топиками используется Kafka Admin API:

  1. Создайте пользователя-администратора с именем kafka-user.

  2. Помимо роли ACCESS_ROLE_ADMIN назначьте пользователю-администратору роли ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER для топиков, имена которых начинаются с префикса cdc.

    Необходимые топики будут созданы автоматически при первом событии изменения в отслеживаемых таблицах кластера-источника. Такое решение может быть удобным для отслеживания изменений во множестве таблиц, однако, требует запас свободного места в хранилище кластера. Подробнее см. в разделе Хранилище в Managed Service for Apache Kafka®.

Подготовьте и активируйте трансферПодготовьте и активируйте трансфер

  1. Создайте эндпоинт для источника MySQL® с настройками:

    • Тип базы данных — MySQL.
    • Параметры эндпоинта:
      • Настройки подключения — Кластер Managed Service for MySQL.
      • Кластер Managed Service for MySQL — выберите созданный ранее кластер Managed Service for MySQL®.
      • База данных — db1.
      • Пользователь — my-user.
      • Пароль — укажите пароль пользователя my-user.
      • Список включённых таблиц — db1.measurements.
  2. Создайте эндпоинт для приемника Apache Kafka® с настройками:

    • Тип базы данных — Kafka.

    • Параметры эндпоинта:

      • Тип подключения — Кластер Managed Service for Apache Kafka.

        • Кластер Managed Service for Apache Kafka — выберите созданный ранее кластер Managed Service for Apache Kafka®.
        • Аутентификация — укажите данные созданного ранее пользователя kafka-user.
      • Топик — Полное имя топика.

      • Полное имя топика — cdc.db1.measurements.

      Если необходимо отслеживать изменения в нескольких таблицах, заполните поля следующим образом:

      • Топик — Префикс топика.
      • Префикс топика — укажите префикс cdc, использованный при формировании имен топиков.
  3. Создайте трансфер типа Репликация с созданными эндпоинтами для источника и приемника.

  4. Активируйте трансфер и дождитесь его перехода в статус Реплицируется.

Проверьте работоспособность трансфераПроверьте работоспособность трансфера

  1. В отдельном терминале запустите утилиту kafkacat в режиме потребителя:

    kafkacat \
        -C \
        -b <FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 \
        -t cdc.db1.measurements \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=kafka-user \
        -X sasl.password=<пароль> \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
        -Z \
        -K:
    

    FQDN хостов-брокеров можно получить со списком хостов в кластере Managed Service for Apache Kafka®.

  2. Подключитесь к кластеру-источнику и добавьте данные в таблицу measurements:

    INSERT INTO db1.measurements VALUES
        ('iv7b74th678t********', '2022-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL),
        ('iv9a94th678t********', '2022-06-07 15:00:10', 55.70985913, 37.62141918,  417.0, 15.7, 10.3, 17, NULL);
    
  3. Убедитесь, что в терминале с запущенной утилитой kafkacat отобразились схема формата данных таблицы db1.measurements и сведения о добавленных строках.

    Пример фрагмента сообщения
    {
        "payload": {
            "device_id": "iv7b74th678t********"
        },
        "schema": {
            "fields": [
                {
                    "field": "device_id",
                    "optional": false,
                    "type": "string"
                }
            ],
            "name": "cdc.db1.measurements.Key",
            "optional": false,
            "type": "struct"
        }
    }: {
        "payload": {
            "after": {
                "altitude": 378,
                "battery_voltage": 5.3,
                "cabin_temperature": 20,
                "datetime": "2020-06-08T17:45:00Z",
                "device_id": "iv7b74th678t********",
                "fuel_level": null,
                "latitude": 53.70987913,
                "longitude": 36.62549834,
                "speed": 20.5
            },
            "before": null,
            "op": "c",
            "source": {
                "connector": "mysql",
                "db": "db1",
                "file": "mysql-log.000016",
                "gtid": "1e46a80b-2e96-11ed-adf7-d00d183780**:*-*****",
                "name": "cdc",
                "pos": 1547357,
                "query": null,
                "row": 0,
                "server_id": 0,
                "snapshot": "false",
                "table": "measurements",
                "thread": null,
                "ts_ms": 1662632515000,
                "version": "1.1.2.Final"
            },
            "transaction": null,
            "ts_ms": 1662632515000
        },
        "schema": {
            "fields": [
                {
                    "field": "before",
                    "fields": [
                        {
                            "field": "device_id",
                            "optional": false,
                            "type": "string"
                        },
                        ...
                    ],
                    "name": "cdc.db1.measurements.Value",
                    "optional": true,
                    "type": "struct"
                },
                {
                    "field": "after",
                    "fields": [
                        ...
                    ],
                    "name": "cdc.db1.measurements.Value",
                    "optional": true,
                    "type": "struct"
                },
                {
                    "field": "source",
                    "fields": [
                        {
                            "field": "version",
                            "optional": false,
                            "type": "string"
                        },
                        ...
                    ],
                    "name": "io.debezium.connector.mysql.Source",
                    "optional": false,
                    "type": "struct"
                },
                {
                    "field": "op",
                    "optional": false,
                    "type": "string"
                },
                ...
            ],
            "name": "cdc.db1.measurements.Envelope",
            "optional": false,
            "type": "struct"
        }
    }
    

Особенности поставки данных с помощью Data TransferОсобенности поставки данных с помощью Data Transfer

  • При переносе данных из MySQL® в Apache Kafka® некоторые типы данных переносятся с изменениями:

    • тип tinyint(1) переносится как boolean;
    • тип real переносится как double;
    • тип bigint unsigned переносится как int64.
  • В блоке метаинформации об источнике payload.source параметры server_id и thread не заполняются.

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:

  1. Деактивируйте и удалите трансфер.

  2. Удалите эндпоинты.

  3. Удалите кластеры:

    • Managed Service for Apache Kafka®;
    • Managed Service for MySQL®.
  4. Если для доступа к хостам кластеров использовались статические публичные IP-адреса, освободите и удалите их.

Была ли статья полезна?

Предыдущая
PostgreSQL в OpenSearch
Следующая
MySQL® в YDS
Проект Яндекса
© 2025 ООО «Яндекс.Облако»