Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Managed Service for YDB
  • Начало работы
    • Все руководства
    • Развертывание веб-приложения
    • Разработка Slack-бота
    • Разработка Telegram-бота
    • Подключение к YDB из функции Yandex Cloud Functions на Python
    • Подключение к базе данных YDB из функции Yandex Cloud Functions на Node.js
    • Конвертация видео в GIF на Python
    • Разработка навыка Алисы и сайта с авторизацией
    • Миграция базы данных из Yandex Managed Service for MySQL® с помощью Yandex Data Transfer
    • Поставка данных из YDB в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Поставка данных из YDB в Yandex Data Streams с помощью Yandex Data Transfer
    • Загрузка данных из Yandex Object Storage в YDB с помощью Yandex Data Transfer
    • Загрузка данных из YDB в Yandex Object Storage с помощью Yandex Data Transfer
    • Загрузка данных из YDB в Yandex Managed Service for ClickHouse® с помощью Yandex Data Transfer
    • Миграция данных из Yandex Managed Service for OpenSearch в YDB с помощью Yandex Data Transfer
    • Построение пайплайна CI/CD в GitLab с использованием serverless-продуктов
    • Поставка данных из Yandex Managed Service for PostgreSQL в Yandex Managed Service for YDB с помощью Yandex Data Transfer
    • Передача событий Yandex Cloud Postbox в Yandex Data Streams и их анализ с помощью Yandex DataLens
    • Поставка данных из Yandex Managed Service for Apache Kafka® в Data Streams с помощью Yandex Data Transfer
    • Поставка данных из Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Сохранение потока данных Data Streams в Managed Service for ClickHouse®
    • Поставка данных из очереди Data Streams в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из очереди Data Streams в Managed Service for YDB с помощью Data Transfer
    • Захват изменений MySQL® и поставка в YDS
    • Захват изменений PostgreSQL и поставка в YDS
    • Интерактивная отладка функций Cloud Functions
    • Блокировка состояний Terraform с помощью Managed Service for YDB
    • Разработка пользовательской интеграции в API Gateway
    • Разработка CRUD API для сервиса фильмов
    • Создание интерактивного serverless-приложения с использованием WebSocket
    • Сокращатель ссылок
    • Загрузка аудитных логов в MaxPatrol SIEM
  • Управление доступом
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Вопросы и ответы
  • Публичные материалы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте источник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Поставка данных из YDB в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer

Поставка данных из YDB в Yandex Managed Service for Apache Kafka®

Статья создана
Yandex Cloud
Обновлена 17 марта 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте источник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы

Вы можете отслеживать изменения данных в источнике Managed Service for YDB и отправлять их в кластер-приемник Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC). Эти данные будут автоматически добавлены в топики Managed Service for Apache Kafka® с именами таблиц Managed Service for YDB.

Примечание

В YDB CDC-режим поддерживается, начиная с версии 22.5 и выше.

Чтобы запустить поставку данных:

  1. Подготовьте источник.
  2. Подготовьте и активируйте трансфер.
  3. Проверьте работоспособность трансфера.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за базу данных Managed Service for YDB. Тарификация зависит от режима использования:

    • Для бессерверного режима — оплачиваются операции с данными и объем хранимых данных.
    • Для режима с выделенными инстансами — оплачивается использование вычислительных ресурсов, выделенных БД, и дискового пространства.

    Подробнее о тарифах Managed Service for YDB.

  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).

  • Плата за использование публичных IP-адресов для хостов кластера (см. тарифы Virtual Private Cloud).

Перед началом работыПеред началом работы

  1. Подготовьте инфраструктуру поставки данных:

    Вручную
    Terraform
    1. Создайте базу данных Managed Service for YDB любой подходящей конфигурации.

    2. Если вы выбрали режим БД Dedicated, создайте и настройте группу безопасности в сети, где находится БД.

    3. Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.

    4. Если вы используете группы безопасности, настройте их так, чтобы к кластеру можно было подключаться из интернета.

    5. Настройте в кластере-приемнике топики Apache Kafka®. Настройки различаются в зависимости от используемого способа управления топиками. Имена топиков для данных формируются как <префикс_топика>.<имя_таблицы_YDB>. В этом руководстве в качестве примера используется префикс cdc.

      • Если выбрано управление топиками через стандартные интерфейсы Yandex Cloud (Консоль управления, CLI, API):

        1. Создайте топик с именем cdc.sensors.

          Для отслеживания изменений в нескольких таблицах создайте для каждой из них отдельный топик с префиксом cdc.

        2. Создайте пользователя с ролями ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER для топика cdc.sensors. Чтобы включить все созданные топики, укажите в имени топика cdc.*.

      • Если для управления топиками используется Kafka Admin API:

        1. Создайте пользователя-администратора.

        2. Помимо роли ACCESS_ROLE_ADMIN назначьте пользователю-администратору роли ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER для топиков cdc.*, имена которых начинаются с префикса cdc.

          Необходимые топики будут созданы автоматически при первом изменении в отслеживаемых таблицах кластера-источника. Такое решение может быть удобным для отслеживания изменений во множестве таблиц, однако, требует запаса свободного места в хранилище кластера. Подробнее см. в разделе Хранилище в Managed Service for Apache Kafka®.

    1. Если у вас еще нет Terraform, установите его.

    2. Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.

    3. Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его.

    4. Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.

    5. Скачайте в ту же рабочую директорию файл конфигурации data-transfer-ydb-mkf.tf.

      В этом файле описаны:

      • сеть;
      • подсеть;
      • группа безопасности и правило, необходимое для подключения к кластеру Managed Service for Apache Kafka®;
      • база данных Managed Service for YDB;
      • кластер-приемник Managed Service for Apache Kafka®;
      • топик Apache Kafka®;
      • пользователь Apache Kafka®;
      • трансфер.

      Выбор способа управления топиками определяется переменной Terraform kf_topics_management. Переменная задается при выполнении команд terraform plan и terraform apply (см. далее):

      • Если управление топиками осуществляется с помощью стандартных интерфейсов Yandex Cloud (Консоль управления, CLI, API):

        1. Для отслеживания изменений в нескольких таблицах добавьте в файл конфигурации для каждой из них описание отдельного топика с префиксом cdc.
        2. Задайте для переменной Terraform kf_topics_management значение false.
      • Если для управления топиками используется Kafka Admin API, задайте для переменной Terraform kf_topics_management значение true.

    6. Укажите в файле data-transfer-ydb-mkf.tf переменные:

      • source_db_name — имя базы данных Managed Service for YDB;
      • target_kf_version – версия Apache Kafka® в кластере-приемнике;
      • target_user_name – имя пользователя для подключения к топику Apache Kafka®;
      • target_user_password – пароль пользователя;
      • transfer_enabled – значение 0, чтобы не создавать трансфер до создания эндпоинтов вручную.
    7. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    8. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

      В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.

  2. Установите утилиту kafkacat для чтения и записи данных в топики Apache Kafka®.

    sudo apt update && sudo apt install --yes kafkacat
    

    Убедитесь, что можете с ее помощью подключиться к кластеру-приемнику Managed Service for Apache Kafka® через SSL.

Подготовьте источникПодготовьте источник

  1. Подключитесь к базе данных Managed Service for YDB.

  2. Создайте YDB-таблицу. В качестве примера используется таблица sensors с информацией, поступающей от условных датчиков автомобиля.

    Добавьте в таблицу колонки вручную:

    Имя Тип Первичный ключ
    device_id String Да
    datetime String
    latitude Double
    longitude Double
    altitude Double
    speed Double
    battery_voltage Double
    cabin_temperature Uint8
    fuel_level Uint32

    Остальные настройки оставьте по умолчанию.

    Также создать таблицу можно YQL-командой:

    CREATE TABLE sensors (
        device_id String,
        datetime String,
        latitude Double,
        longitude Double,
        altitude Double,
        speed Double,
        battery_voltage Double,
        cabin_temperature Uint8,
        fuel_level Uint32,
        PRIMARY KEY (device_id)
    )
    

Подготовьте и активируйте трансферПодготовьте и активируйте трансфер

  1. Создайте эндпоинт для источника:

    • Тип базы данных — YDB.

    • Параметры эндпоинта:

      • Настройки подключения:

        • База данных — выберите базу данных Managed Service for YDB из списка.
        • Идентификатор сервисного аккаунта — выберите или создайте сервисный аккаунт с ролью editor.
      • Список включенных путей — укажите имена таблиц и директорий базы данных Managed Service for YDB для переноса.

        Важно

        Реплицируются только указанные таблицы и директории. Если не указать имен, то никакие таблицы не будут перенесены.

  2. Создайте эндпоинт для приемника:

    • Тип базы данных — Kafka.

    • Параметры эндпоинта:

      • Тип подключения — Кластер Managed Service for Apache Kafka.

        • Кластер Managed Service for Apache Kafka — выберите созданный ранее кластер-источник Managed Service for Apache Kafka®.
        • Аутентификация — укажите данные созданного ранее пользователя Apache Kafka®.
      • Топик — Полное имя топика.

      • Полное имя топика — cdc.sensors.

      Если необходимо отслеживать изменения в нескольких таблицах, заполните поля следующим образом:

      • Топик — Префикс топика.
      • Префикс топика — укажите префикс cdc, использованный при формировании имен топиков.
  3. Создайте трансфер:

    Вручную
    Terraform
    1. Создайте трансфер типа Репликация, использующий созданные эндпоинты.
    2. Активируйте его.
    1. Укажите в файле data-transfer-ydb-mkf.tf переменные:

      • source_endpoint_id — значение идентификатора эндпоинта для источника;
      • target_endpoint_id — значение идентификатора эндпоинта для приемника;
      • transfer_enabled – значение 1 для создания трансфера.
    2. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    3. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

      Трансфер активируется автоматически после создания.

Проверьте работоспособность трансфераПроверьте работоспособность трансфера

  1. Дождитесь перехода трансфера в статус Реплицируется.

  2. В отдельном терминале запустите утилиту kafkacat в режиме потребителя:

    kafkacat \
        -C \
        -b <FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 \
        -t cdc.sensors \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=kafka-user \
        -X sasl.password=<пароль> \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
        -Z \
        -K:
    

    FQDN хостов-брокеров можно получить со списком хостов в кластере Managed Service for Apache Kafka®.

  3. Подключитесь к базе данных Managed Service for YDB и добавьте тестовые данные в таблицу sensors:

    REPLACE INTO sensors (device_id, datetime, latitude, longitude, altitude, speed, battery_voltage, cabin_temperature, fuel_level) VALUES 
        ('iv9a94th6rzt********', '2022-06-05 17:27:00', 55.70329032, 37.65472196, 427.5, 0, 23.5, 17, NULL),
        ('rhibbh3y08qm********', '2022-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32),
        ('iv9a94th6rzt********', '2022-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, NULL, 20.5, 15, 20);
    
  4. Убедитесь, что в терминале с запущенной утилитой kafkacat отобразились схема формата данных таблицы sensors и сведения о добавленных строках.

    Пример фрагмента сообщения
    {
      "payload": {
          "device_id": "aXY5YTk0dGg2cnp0b294********"
        },
        "schema": {
          "fields": [
            {
                "field": "device_id",
                "optional": false,
                "type": "bytes"
            }
          ],
          "name": "cdc..sensors.Key",
          "optional": false,
          "type": "struct"
        }
    }: {
      "payload": {
        "after": {
            "altitude": 378,
            "battery_voltage": 20.5,
            "cabin_temperature": 15,
            "datetime": "MjAyMi0wNi0wOCAxNzo0********",
            "device_id": "aXY5YTk0dGg2cnp0b294********",
            "fuel_level": 20,
            "latitude": 53.70987913,
            "longitude": 36.62549834,
            "speed": null
        },
        "before": null,
        "op": "c",
        "source": {
            "db": "",
            "name": "cdc",
            "snapshot": "false",
            "table": "sensors",
            "ts_ms": 1678642104797,
            "version": "1.1.2.Final"
        },
        "transaction": null,
        "ts_ms": 1678642104797
      },
      "schema": {
        "fields": [
            {
                "field": "before",
                "fields": [
                    {
                        "field": "device_id",
                        "optional": false,
                        "type": "bytes"
                    },
                    ...
                ],
                "name": "cdc..sensors.Value",
                "optional": true,
                "type": "struct"
            },
            {
                "field": "after",
                "fields": [
                    {
                        "field": "device_id",
                        "optional": false,
                        "type": "bytes"
                    },
                    ...
                ],
                "name": "cdc..sensors.Value",
                "optional": true,
                "type": "struct"
            },
            {
                "field": "source",
                "fields": [
                    {
                        "field": "version",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "connector",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "name",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "ts_ms",
                        "optional": false,
                        "type": "int64"
                    },
                    {
                        "default": "false",
                        "field": "snapshot",
                        "name": "io.debezium.data.Enum",
                        "optional": true,
                        "parameters": {
                            "allowed": "true,last,false"
                        },
                        "type": "string",
                        "version": 1
                    },
                    {
                        "field": "db",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "table",
                        "optional": false,
                        "type": "string"
                    }
                ],
                "optional": false,
                "type": "struct"
            },
            ...,
            {
                "field": "transaction",
                "fields": [
                    {
                        "field": "id",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "total_order",
                        "optional": false,
                        "type": "int64"
                    },
                    {
                        "field": "data_collection_order",
                        "optional": false,
                        "type": "int64"
                    }
                ],
                "optional": true,
                "type": "struct"
            }
        ],
        "name": "cdc..sensors.Envelope",
        "optional": false,
        "type": "struct"
      }
    }
    

Удалите созданные ресурсыУдалите созданные ресурсы

Примечание

Перед тем как удалить созданные ресурсы, деактивируйте трансфер.

Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:

  1. Удалите трансфер.
  2. Удалите эндпоинты для источника и приемника.
  3. Если при создании эндпоинта для источника вы создавали сервисный аккаунт, удалите его.

Остальные ресурсы удалите в зависимости от способа их создания:

Вручную
Terraform
  1. Удалите кластер Managed Service for Apache Kafka®.
  2. Удалите базу данных Managed Service for YDB.
  1. В терминале перейдите в директорию с планом инфраструктуры.

    Важно

    Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.

  2. Удалите ресурсы:

    1. Выполните команду:

      terraform destroy
      
    2. Подтвердите удаление ресурсов и дождитесь завершения операции.

    Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.

Была ли статья полезна?

Предыдущая
Миграция базы данных из Yandex Managed Service for MySQL® с помощью Yandex Data Transfer
Следующая
Поставка данных из YDB в Yandex Data Streams с помощью Yandex Data Transfer
Проект Яндекса
© 2025 ООО «Яндекс.Облако»