Yandex Cloud
Поиск
Связаться с намиПопробовать бесплатно
  • Истории успеха
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Доступны в регионе
    • Инфраструктура и сеть
    • Платформа данных
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • ИИ для бизнеса
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Партнёрская программа
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Истории успеха
  • Документация
  • Блог
Проект Яндекса
© 2025 ТОО «Облачные Сервисы Казахстан»
Yandex Managed Service for YDB
  • Начало работы
    • Все руководства
    • Развертывание веб-приложения
    • Подключение к YDB из функции Yandex Cloud Functions на Python
    • Подключение к базе данных YDB из функции Yandex Cloud Functions на Node.js
    • Конвертация видео в GIF на Python
    • Разработка навыка Алисы и сайта с авторизацией
    • Миграция базы данных из Yandex Managed Service for MySQL® с помощью Yandex Data Transfer
    • Поставка данных из YDB в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Поставка данных из YDB в Yandex Data Streams с помощью Yandex Data Transfer
    • Загрузка данных из Yandex Object Storage в YDB с помощью Yandex Data Transfer
    • Загрузка данных из YDB в Yandex Object Storage с помощью Yandex Data Transfer
    • Загрузка данных из YDB в Yandex Managed Service for ClickHouse® с помощью Yandex Data Transfer
    • Миграция данных из Yandex Managed Service for OpenSearch в YDB с помощью Yandex Data Transfer
    • Построение пайплайна CI/CD в GitLab с использованием serverless-продуктов
    • Поставка данных из Yandex Managed Service for PostgreSQL в Yandex Managed Service for YDB с помощью Yandex Data Transfer
    • Передача событий Yandex Cloud Postbox в Yandex Data Streams и их анализ с помощью Yandex DataLens
    • Поставка данных из Yandex Managed Service for Apache Kafka® в Data Streams с помощью Yandex Data Transfer
    • Поставка данных из Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Сохранение потока данных Data Streams в Managed Service for ClickHouse®
    • Поставка данных из очереди Data Streams в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из очереди Data Streams в Managed Service for YDB с помощью Data Transfer
    • Захват изменений MySQL® и поставка в YDS
    • Захват изменений PostgreSQL и поставка в YDS
    • Интерактивная отладка функций Cloud Functions
    • Блокировка состояний Terraform с помощью Managed Service for YDB
    • Разработка пользовательской интеграции в API Gateway
    • Разработка CRUD API для сервиса фильмов
    • Создание интерактивного serverless-приложения с использованием WebSocket
    • Сокращатель ссылок
    • Загрузка аудитных логов в MaxPatrol SIEM
    • Интеграция Yandex Cloud Postbox с внешними системами через вебхуки
  • Управление доступом
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Вопросы и ответы
  • Публичные материалы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте источник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Поставка данных из YDB в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer

Поставка данных из YDB в Yandex Managed Service for Apache Kafka®

Статья создана
Yandex Cloud
Обновлена 15 декабря 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте источник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы

Важно

Этот документ не применим для пользователей Yandex Cloud в регионе Казахстан. См. полный перечень поддерживаемых эндпоинтов в Data Transfer.

Вы можете отслеживать изменения данных в источнике Managed Service for YDB и отправлять их в кластер-приемник Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC). Эти данные будут автоматически добавлены в топики Managed Service for Apache Kafka® с именами таблиц Managed Service for YDB.

Примечание

В YDB CDC-режим поддерживается, начиная с версии 22.5 и выше.

Чтобы запустить поставку данных:

  1. Подготовьте источник.
  2. Подготовьте и активируйте трансфер.
  3. Проверьте работоспособность трансфера.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

  • База данных Managed Service for YDB (см. тарифы Managed Service for YDB). Стоимость зависит от режима использования:

    • Для бессерверного режима — оплачиваются операции с данными, объем хранимых данных и резервных копий.
    • Для режима с выделенными инстансами — оплачивается использование выделенных БД вычислительных ресурсов, объем хранилища и резервные копии.
  • Кластер Managed Service for Apache Kafka®: выделенные хостам вычислительные ресурсы, объем хранилища и резервных копий (см. тарифы Managed Service for Apache Kafka®).

  • Публичные IP-адреса, если для хостов кластера включен публичный доступ (см. тарифы Virtual Private Cloud).

  • Каждый трансфер: использование вычислительных ресурсов и количество переданных строк данных (см. тарифы Data Transfer).

Перед началом работыПеред началом работы

  1. Подготовьте инфраструктуру поставки данных:

    Вручную
    Terraform
    1. Создайте базу данных Managed Service for YDB любой подходящей конфигурации.

    2. Если вы выбрали режим БД Dedicated, создайте и настройте группу безопасности в сети, где находится БД.

    3. Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.

    4. Если вы используете группы безопасности, настройте их так, чтобы к кластеру можно было подключаться из интернета.

    5. Настройте в кластере-приемнике топики Apache Kafka®. Настройки различаются в зависимости от используемого способа управления топиками. Имена топиков для данных формируются как <префикс_топика>.<имя_таблицы_YDB>. В этом руководстве в качестве примера используется префикс cdc.

      • Если выбрано управление топиками через стандартные интерфейсы Yandex Cloud (Консоль управления, CLI, API):

        1. Создайте топик с именем cdc.sensors.

          Для отслеживания изменений в нескольких таблицах создайте для каждой из них отдельный топик с префиксом cdc.

        2. Создайте пользователя с ролями ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER для топика cdc.sensors. Чтобы включить все созданные топики, укажите в имени топика cdc.*.

      • Если для управления топиками используется Kafka Admin API:

        1. Создайте пользователя-администратора.

        2. Помимо роли ACCESS_ROLE_ADMIN назначьте пользователю-администратору роли ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER для топиков cdc.*, имена которых начинаются с префикса cdc.

          Необходимые топики будут созданы автоматически при первом изменении в отслеживаемых таблицах кластера-источника. Такое решение может быть удобным для отслеживания изменений во множестве таблиц, однако, требует запаса свободного места в хранилище кластера. Подробнее см. в разделе Хранилище в Managed Service for Apache Kafka®.

    1. Если у вас еще нет Terraform, установите его.

    2. Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.

    3. Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его.

    4. Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.

    5. Скачайте в ту же рабочую директорию файл конфигурации data-transfer-ydb-mkf.tf.

      В этом файле описаны:

      • сеть;
      • подсеть;
      • группа безопасности и правило, необходимое для подключения к кластеру Managed Service for Apache Kafka®;
      • база данных Managed Service for YDB;
      • кластер-приемник Managed Service for Apache Kafka®;
      • топик Apache Kafka®;
      • пользователь Apache Kafka®;
      • трансфер.

      Выбор способа управления топиками определяется переменной Terraform kf_topics_management. Переменная задается при выполнении команд terraform plan и terraform apply (см. далее):

      • Если управление топиками осуществляется с помощью стандартных интерфейсов Yandex Cloud (Консоль управления, CLI, API):

        1. Для отслеживания изменений в нескольких таблицах добавьте в файл конфигурации для каждой из них описание отдельного топика с префиксом cdc.
        2. Задайте для переменной Terraform kf_topics_management значение false.
      • Если для управления топиками используется Kafka Admin API, задайте для переменной Terraform kf_topics_management значение true.

    6. Укажите в файле data-transfer-ydb-mkf.tf переменные:

      • source_db_name — имя базы данных Managed Service for YDB;
      • target_kf_version – версия Apache Kafka® в кластере-приемнике;
      • target_user_name – имя пользователя для подключения к топику Apache Kafka®;
      • target_user_password – пароль пользователя;
      • transfer_enabled – значение 0, чтобы не создавать трансфер до создания эндпоинтов вручную.
    7. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    8. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

      В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.

  2. Установите утилиту kafkacat для чтения и записи данных в топики Apache Kafka®.

    sudo apt update && sudo apt install --yes kafkacat
    

    Убедитесь, что можете с ее помощью подключиться к кластеру-приемнику Managed Service for Apache Kafka® через SSL.

Подготовьте источникПодготовьте источник

  1. Подключитесь к базе данных Managed Service for YDB.

  2. Создайте YDB-таблицу. В качестве примера используется таблица sensors с информацией, поступающей от условных датчиков автомобиля.

    Добавьте в таблицу колонки вручную:

    Имя Тип Первичный ключ
    device_id String Да
    datetime String
    latitude Double
    longitude Double
    altitude Double
    speed Double
    battery_voltage Double
    cabin_temperature Uint8
    fuel_level Uint32

    Остальные настройки оставьте по умолчанию.

    Также создать таблицу можно YQL-командой:

    CREATE TABLE sensors (
        device_id String,
        datetime String,
        latitude Double,
        longitude Double,
        altitude Double,
        speed Double,
        battery_voltage Double,
        cabin_temperature Uint8,
        fuel_level Uint32,
        PRIMARY KEY (device_id)
    )
    

Подготовьте и активируйте трансферПодготовьте и активируйте трансфер

  1. Создайте эндпоинт для источника:

    • Тип базы данных — YDB.

    • Параметры эндпоинта:

      • Настройки подключения:

        • База данных — выберите базу данных Managed Service for YDB из списка.

        • Идентификатор сервисного аккаунта — выберите или создайте сервисный аккаунт с ролью editor.

      • Список включенных путей — укажите имена таблиц и директорий базы данных Managed Service for YDB для переноса.

        Важно

        Реплицируются только указанные таблицы и директории. Если не указать имен, то никакие таблицы не будут перенесены.

  2. Создайте эндпоинт для приемника:

    • Тип базы данных — Kafka.

    • Параметры эндпоинта:

      • Тип подключения — Кластер Managed Service for Apache Kafka.

        • Кластер Managed Service for Apache Kafka — выберите созданный ранее кластер-источник Managed Service for Apache Kafka®.
        • Аутентификация — укажите данные созданного ранее пользователя Apache Kafka®.
      • Топик — Полное имя топика.

      • Полное имя топика — cdc.sensors.

      Если необходимо отслеживать изменения в нескольких таблицах, заполните поля следующим образом:

      • Топик — Префикс топика.
      • Префикс топика — укажите префикс cdc, использованный при формировании имен топиков.
  3. Создайте трансфер:

    Вручную
    Terraform
    1. Создайте трансфер типа Репликация, использующий созданные эндпоинты.
    2. Активируйте его.
    1. Укажите в файле data-transfer-ydb-mkf.tf переменные:

      • source_endpoint_id — значение идентификатора эндпоинта для источника;
      • target_endpoint_id — значение идентификатора эндпоинта для приемника;
      • transfer_enabled – значение 1 для создания трансфера.
    2. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    3. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

      Трансфер активируется автоматически после создания.

Проверьте работоспособность трансфераПроверьте работоспособность трансфера

  1. Дождитесь перехода трансфера в статус Реплицируется.

  2. В отдельном терминале запустите утилиту kafkacat в режиме потребителя:

    kafkacat \
        -C \
        -b <FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 \
        -t cdc.sensors \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=kafka-user \
        -X sasl.password=<пароль> \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
        -Z \
        -K:
    

    FQDN хостов-брокеров можно получить со списком хостов в кластере Managed Service for Apache Kafka®.

  3. Подключитесь к базе данных Managed Service for YDB и добавьте тестовые данные в таблицу sensors:

    REPLACE INTO sensors (device_id, datetime, latitude, longitude, altitude, speed, battery_voltage, cabin_temperature, fuel_level) VALUES 
        ('iv9a94th6rzt********', '2022-06-05 17:27:00', 55.70329032, 37.65472196, 427.5, 0, 23.5, 17, NULL),
        ('rhibbh3y08qm********', '2022-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32),
        ('iv9a94th6rzt********', '2022-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, NULL, 20.5, 15, 20);
    
  4. Убедитесь, что в терминале с запущенной утилитой kafkacat отобразились схема формата данных таблицы sensors и сведения о добавленных строках.

    Пример фрагмента сообщения
    {
      "payload": {
          "device_id": "aXY5YTk0dGg2cnp0b294********"
        },
        "schema": {
          "fields": [
            {
                "field": "device_id",
                "optional": false,
                "type": "bytes"
            }
          ],
          "name": "cdc..sensors.Key",
          "optional": false,
          "type": "struct"
        }
    }: {
      "payload": {
        "after": {
            "altitude": 378,
            "battery_voltage": 20.5,
            "cabin_temperature": 15,
            "datetime": "MjAyMi0wNi0wOCAxNzo0********",
            "device_id": "aXY5YTk0dGg2cnp0b294********",
            "fuel_level": 20,
            "latitude": 53.70987913,
            "longitude": 36.62549834,
            "speed": null
        },
        "before": null,
        "op": "c",
        "source": {
            "db": "",
            "name": "cdc",
            "snapshot": "false",
            "table": "sensors",
            "ts_ms": 1678642104797,
            "version": "1.1.2.Final"
        },
        "transaction": null,
        "ts_ms": 1678642104797
      },
      "schema": {
        "fields": [
            {
                "field": "before",
                "fields": [
                    {
                        "field": "device_id",
                        "optional": false,
                        "type": "bytes"
                    },
                    ...
                ],
                "name": "cdc..sensors.Value",
                "optional": true,
                "type": "struct"
            },
            {
                "field": "after",
                "fields": [
                    {
                        "field": "device_id",
                        "optional": false,
                        "type": "bytes"
                    },
                    ...
                ],
                "name": "cdc..sensors.Value",
                "optional": true,
                "type": "struct"
            },
            {
                "field": "source",
                "fields": [
                    {
                        "field": "version",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "connector",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "name",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "ts_ms",
                        "optional": false,
                        "type": "int64"
                    },
                    {
                        "default": "false",
                        "field": "snapshot",
                        "name": "io.debezium.data.Enum",
                        "optional": true,
                        "parameters": {
                            "allowed": "true,last,false"
                        },
                        "type": "string",
                        "version": 1
                    },
                    {
                        "field": "db",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "table",
                        "optional": false,
                        "type": "string"
                    }
                ],
                "optional": false,
                "type": "struct"
            },
            ...,
            {
                "field": "transaction",
                "fields": [
                    {
                        "field": "id",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "total_order",
                        "optional": false,
                        "type": "int64"
                    },
                    {
                        "field": "data_collection_order",
                        "optional": false,
                        "type": "int64"
                    }
                ],
                "optional": true,
                "type": "struct"
            }
        ],
        "name": "cdc..sensors.Envelope",
        "optional": false,
        "type": "struct"
      }
    }
    

Удалите созданные ресурсыУдалите созданные ресурсы

Примечание

Перед тем как удалить созданные ресурсы, деактивируйте трансфер.

Чтобы снизить потребление ресурсов, которые вам не нужны, удалите их:

  1. Удалите трансфер.

  2. Удалите эндпоинты для источника и приемника.

  3. Если при создании эндпоинта для источника вы создавали сервисный аккаунт, удалите его.

  4. Остальные ресурсы удалите в зависимости от способа их создания:

    Вручную
    Terraform
    1. Удалите кластер Managed Service for Apache Kafka®.
    2. Удалите базу данных Managed Service for YDB.
    1. В терминале перейдите в директорию с планом инфраструктуры.

      Важно

      Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.

    2. Удалите ресурсы:

      1. Выполните команду:

        terraform destroy
        
      2. Подтвердите удаление ресурсов и дождитесь завершения операции.

      Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.

Была ли статья полезна?

Предыдущая
Миграция базы данных из Yandex Managed Service for MySQL® с помощью Yandex Data Transfer
Следующая
Поставка данных из YDB в Yandex Data Streams с помощью Yandex Data Transfer
Проект Яндекса
© 2025 ТОО «Облачные Сервисы Казахстан»