Yandex Cloud
Поиск
Связаться с намиПопробовать бесплатно
  • Истории успеха
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • ИИ для бизнеса
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Калькулятор цен
    • Тарифы
    • Промоакции и free tier
    • Правила тарификации
  • Истории успеха
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Data Transfer
  • Доступные трансферы
  • Начало работы
    • Все руководства
      • Apache Kafka® в ClickHouse®
      • Apache Kafka® в PostgreSQL
      • Apache Kafka® в Greenplum®
      • Apache Kafka® в Yandex StoreDoc
      • Apache Kafka® в MySQL®
      • Apache Kafka® в OpenSearch
      • Apache Kafka® в YDB
      • Apache Kafka® в YDS
      • YDS в Apache Kafka®
      • YDS в ClickHouse®
      • YDS в Object Storage
      • YDS в YDB
      • Ввод данных в системы хранения
  • Решение проблем
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Поставка данных из очередей
  3. Apache Kafka® в YDB

Поставка данных из очереди Apache Kafka® в YDB

Статья создана
Yandex Cloud
Обновлена 15 декабря 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы

В кластер Managed Service for YDB можно в реальном времени поставлять данные из топиков Apache Kafka®. Эти данные будут автоматически добавлены в таблицы YDB с именами топиков.

Чтобы запустить поставку данных:

  1. Подготовьте и активируйте трансфер.
  2. Проверьте работоспособность трансфера.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

  • Кластер Managed Service for Apache Kafka®: выделенные хостам вычислительные ресурсы, объем хранилища и резервных копий (см. тарифы Managed Service for Apache Kafka®).

  • Публичные IP-адреса, если для хостов кластера включен публичный доступ (см. тарифы Virtual Private Cloud).

  • База данных Managed Service for YDB (см. тарифы Managed Service for YDB). Стоимость зависит от режима использования:

    • Для бессерверного режима — оплачиваются операции с данными, объем хранимых данных и резервных копий.
    • Для режима с выделенными инстансами — оплачивается использование выделенных БД вычислительных ресурсов, объем хранилища и резервные копии.
  • Каждый трансфер: использование вычислительных ресурсов и количество переданных строк данных (см. тарифы Data Transfer).

Перед началом работыПеред началом работы

  1. Подготовьте инфраструктуру поставки данных:

    Вручную
    Terraform
    1. Создайте кластер-источник Managed Service for Apache Kafka® любой подходящей конфигурации.
    2. Создайте базу данных Managed Service for YDB любой подходящей конфигурации.
    3. Создайте в кластере-источнике топик с именем sensors.
    4. Создайте в кластере-источнике пользователя с правами доступа ACCESS_ROLE_PRODUCER, ACCESS_ROLE_CONSUMER к созданному топику.
    1. Если у вас еще нет Terraform, установите его.

    2. Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.

    3. Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его.

    4. Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.

    5. Скачайте в ту же рабочую директорию файл конфигурации data-transfer-mkf-ydb.tf.

      В этом файле описаны:

      • сеть;
      • подсеть;
      • группа безопасности и правило, необходимое для подключения к кластеру Managed Service for Apache Kafka®;
      • кластер-источник Managed Service for Apache Kafka®;
      • топик Apache Kafka®;
      • пользователь Apache Kafka®;
      • база данных Managed Service for YDB;
      • трансфер.
    6. Укажите в файле data-transfer-mkf-ydb.tf переменные:

      • source_kf_version – версия Apache Kafka® в кластере-источнике;
      • source_user_name – имя пользователя для подключения к топику Apache Kafka®;
      • source_user_password – пароль пользователя;
      • target_db_name — имя базы данных Managed Service for YDB;
      • transfer_enabled – значение 0, чтобы не создавать трансфер до создания эндпоинта-приемника вручную.
    7. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    8. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

      В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.

    В созданный топик Apache Kafka® sensors кластера-источника будут поступать тестовые данные от сенсоров автомобиля в формате JSON:

    {
        "device_id":"iv9a94th6rzt********",
        "datetime":"2020-06-05 17:27:00",
        "latitude":"55.70329032",
        "longitude":"37.65472196",
        "altitude":"427.5",
        "speed":"0",
        "battery_voltage":"23.5",
        "cabin_temperature":"17",
        "fuel_level":null
    }
    
  2. Установите утилиты:

    • kafkacat — для чтения и записи данных в топики Apache Kafka®.

      sudo apt update && sudo apt install --yes kafkacat
      

      Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.

    • jq — для потоковой обработки JSON-файлов.

      sudo apt update && sudo apt-get install --yes jq
      

Подготовьте и активируйте трансферПодготовьте и активируйте трансфер

  1. Создайте эндпоинт для приемника:

    • Тип базы данных — YDB.

    • Параметры эндпоинта:

      • Настройки подключения:
        • База данных — выберите базу данных Managed Service for YDB из списка.

        • Идентификатор сервисного аккаунта — выберите или создайте сервисный аккаунт с ролью editor.

  2. Создайте эндпоинт для источника:

    • Тип базы данных — Kafka.
    • Параметры эндпоинта:
      • Тип подключения — Кластер Managed Service for Apache Kafka.

        Выберите кластер-источник из списка и укажите настройки подключения к нему.

      • Расширенные настройки → Правила конвертации.

        • Формат данных – JSON.
        • Схема данных – Вы можете задать схему двумя способами:
          • Список полей.

            Задайте список полей топика вручную:

            Имя Тип Ключ
            device_id STRING Да
            datetime STRING
            latitude DOUBLE
            longitude DOUBLE
            altitude DOUBLE
            speed DOUBLE
            battery_voltage DOUBLE
            cabin_temperature UINT16
            fuel_level UINT16
          • JSON спецификация.

            Создайте и загрузите файл схемы данных в формате JSON json_schema.json:

            json_schema.json
            [
                {
                    "name": "device_id",
                    "type": "string",
                    "key": true
                },
                {
                    "name": "datetime",
                    "type": "string"
                },
                {
                    "name": "latitude",
                    "type": "double"
                },
                {
                    "name": "longitude",
                    "type": "double"
                },
                {
                    "name": "altitude",
                    "type": "double"
                },
                {
                    "name": "speed",
                    "type": "double"
                },
                {
                    "name": "battery_voltage",
                    "type": "double"
                },
                {
                    "name": "cabin_temperature",
                    "type": "uint16"
                },
                {
                    "name": "fuel_level",
                    "type": "uint16"
                }
            ]
            
  3. Создайте трансфер:

    Вручную
    Terraform
    1. Создайте трансфер типа Репликация, использующий созданные эндпоинты.
    2. Активируйте его.
    1. Укажите в файле data-transfer-mkf-ydb.tf переменные:

      • source_endpoint_id — значение идентификатора эндпоинта для источника;
      • target_endpoint_id — значение идентификатора эндпоинта для приемника;
      • transfer_enabled – значение 1 для создания трансфера.
    2. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    3. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

      Трансфер активируется автоматически после создания.

Проверьте работоспособность трансфераПроверьте работоспособность трансфера

  1. Дождитесь перехода трансфера в статус Реплицируется.

  2. Убедитесь, что в базу данных Managed Service for YDB переносятся данные из топика кластера-источника Managed Service for Apache Kafka®:

    1. Создайте файл sample.json с тестовыми данными:

      {
          "device_id": "iv9a94th6rzt********",
          "datetime": "2020-06-05 17:27:00",
          "latitude": 55.70329032,
          "longitude": 37.65472196,
          "altitude": 427.5,
          "speed": 0,
          "battery_voltage": 23.5,
          "cabin_temperature": 17,
          "fuel_level": null
      }
      
      {
          "device_id": "rhibbh3y08qm********",
          "datetime": "2020-06-06 09:49:54",
          "latitude": 55.71294467,
          "longitude": 37.66542005,
          "altitude": 429.13,
          "speed": 55.5,
          "battery_voltage": null,
          "cabin_temperature": 18,
          "fuel_level": 32
      }
      
      {
          "device_id": "iv9a94th6rzt********",
          "datetime": "2020-06-07 15:00:10",
          "latitude": 55.70985913,
          "longitude": 37.62141918,
          "altitude": 417.0,
          "speed": 15.7,
          "battery_voltage": 10.3,
          "cabin_temperature": 17,
          "fuel_level": null
      }
      
    2. Отправьте данные из файла sample.json в топик sensors Managed Service for Apache Kafka® с помощью утилит jq и kafkacat:

      jq -rc . sample.json | kafkacat -P \
         -b <FQDN_хоста-брокера>:9091 \
         -t sensors \
         -k key \
         -X security.protocol=SASL_SSL \
         -X sasl.mechanisms=SCRAM-SHA-512 \
         -X sasl.username="<имя_пользователя_в_кластере-источнике>" \
         -X sasl.password="<пароль_пользователя_в_кластере-источнике>" \
         -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
      

      Данные отправляются от имени созданного пользователя. Подробнее о настройке SSL-сертификата и работе с kafkacat см. в разделе Подключение к кластеру Apache Kafka® из приложений.

    3. Убедитесь, что в базу данных Managed Service for YDB перенеслись данные из кластера-источника Managed Service for Apache Kafka®:

      Консоль управления
      YDB CLI
      1. В консоли управления выберите каталог, в котором находится нужная база данных.
      2. В списке сервисов выберите Managed Service for YDB.
      3. Выберите базу из списка.
      4. Перейдите на вкладку Навигация.
      5. Проверьте, что база данных Managed Service for YDB содержит таблицу sensors с тестовыми данными из топика.
      1. Подключитесь к базе данных Managed Service for YDB.

      2. Проверьте, что база данных содержит таблицу sensors с тестовыми данными из топика:

        ydb table query execute \
          --query "SELECT * \
          FROM sensors"
        

Удалите созданные ресурсыУдалите созданные ресурсы

Примечание

Перед тем как удалить созданные ресурсы, деактивируйте трансфер.

Чтобы снизить потребление ресурсов, которые вам не нужны, удалите их:

  1. Удалите трансфер.

  2. Удалите эндпоинты для источника и приемника.

  3. Если при создании эндпоинта для приемника вы создавали сервисный аккаунт, удалите его.

  4. Остальные ресурсы удалите в зависимости от способа их создания:

    Вручную
    Terraform
    1. Удалите кластер Managed Service for Apache Kafka®.
    2. Удалите базу данных Managed Service for YDB.
    1. В терминале перейдите в директорию с планом инфраструктуры.

      Важно

      Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.

    2. Удалите ресурсы:

      1. Выполните команду:

        terraform destroy
        
      2. Подтвердите удаление ресурсов и дождитесь завершения операции.

      Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.

Была ли статья полезна?

Предыдущая
Apache Kafka® в OpenSearch
Следующая
Apache Kafka® в YDS
Проект Яндекса
© 2025 ООО «Яндекс.Облако»