Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Data Transfer
  • Доступные трансферы
  • Начало работы
    • Все руководства
      • Apache Kafka® в ClickHouse®
      • Apache Kafka® в PostgreSQL
      • Apache Kafka® в Greenplum®
      • Apache Kafka® в MongoDB
      • Apache Kafka® в MySQL®
      • Apache Kafka® в OpenSearch
      • Apache Kafka® в YDB
      • Apache Kafka® в YDS
      • YDS в Apache Kafka®
      • YDS в ClickHouse®
      • YDS в Object Storage
      • YDS в YDB
      • Ввод данных в системы хранения
  • Решение проблем
  • Управление доступом
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Поставка данных из очередей
  3. Apache Kafka® в YDB

Поставка данных из очереди Apache Kafka® в YDB

Статья создана
Yandex Cloud
Обновлена 17 марта 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы

В кластер Managed Service for YDB можно в реальном времени поставлять данные из топиков Apache Kafka®. Эти данные будут автоматически добавлены в таблицы YDB с именами топиков.

Чтобы запустить поставку данных:

  1. Подготовьте и активируйте трансфер.
  2. Проверьте работоспособность трансфера.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).

  • Плата за использование публичных IP-адресов, если для хостов кластера включен публичный доступ (см. тарифы Virtual Private Cloud).

  • Плата за базу данных Managed Service for YDB. Тарификация зависит от режима использования:

    • Для бессерверного режима — оплачиваются операции с данными и объем хранимых данных.
    • Для режима с выделенными инстансами — оплачивается использование вычислительных ресурсов, выделенных БД, и дискового пространства.

    Подробнее о тарифах Managed Service for YDB.

  • Плата за трансфер: использование вычислительных ресурсов и количество переданных строк данных (см. тарифы Data Transfer).

Перед началом работыПеред началом работы

  1. Подготовьте инфраструктуру поставки данных:

    Вручную
    Terraform
    1. Создайте кластер-источник Managed Service for Apache Kafka® любой подходящей конфигурации.
    2. Создайте базу данных Managed Service for YDB любой подходящей конфигурации.
    3. Создайте в кластере-источнике топик с именем sensors.
    4. Создайте в кластере-источнике пользователя с правами доступа ACCESS_ROLE_PRODUCER, ACCESS_ROLE_CONSUMER к созданному топику.
    1. Если у вас еще нет Terraform, установите его.

    2. Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.

    3. Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его.

    4. Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.

    5. Скачайте в ту же рабочую директорию файл конфигурации data-transfer-mkf-ydb.tf.

      В этом файле описаны:

      • сеть;
      • подсеть;
      • группа безопасности и правило, необходимое для подключения к кластеру Managed Service for Apache Kafka®;
      • кластер-источник Managed Service for Apache Kafka®;
      • топик Apache Kafka®;
      • пользователь Apache Kafka®;
      • база данных Managed Service for YDB;
      • трансфер.
    6. Укажите в файле data-transfer-mkf-ydb.tf переменные:

      • source_kf_version – версия Apache Kafka® в кластере-источнике;
      • source_user_name – имя пользователя для подключения к топику Apache Kafka®;
      • source_user_password – пароль пользователя;
      • target_db_name — имя базы данных Managed Service for YDB;
      • transfer_enabled – значение 0, чтобы не создавать трансфер до создания эндпоинта-приемника вручную.
    7. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    8. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

      В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.

    В созданный топик Apache Kafka® sensors кластера-источника будут поступать тестовые данные от сенсоров автомобиля в формате JSON:

    {
        "device_id":"iv9a94th6rzt********",
        "datetime":"2020-06-05 17:27:00",
        "latitude":"55.70329032",
        "longitude":"37.65472196",
        "altitude":"427.5",
        "speed":"0",
        "battery_voltage":"23.5",
        "cabin_temperature":"17",
        "fuel_level":null
    }
    
  2. Установите утилиты:

    • kafkacat — для чтения и записи данных в топики Apache Kafka®.

      sudo apt update && sudo apt install --yes kafkacat
      

      Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.

    • jq — для потоковой обработки JSON-файлов.

      sudo apt update && sudo apt-get install --yes jq
      

Подготовьте и активируйте трансферПодготовьте и активируйте трансфер

  1. Создайте эндпоинт для приемника:

    • Тип базы данных — YDB.

    • Параметры эндпоинта:

      • Настройки подключения:
        • База данных — выберите базу данных Managed Service for YDB из списка.
        • Идентификатор сервисного аккаунта — выберите или создайте сервисный аккаунт с ролью editor.
  2. Создайте эндпоинт для источника:

    • Тип базы данных — Kafka.
    • Параметры эндпоинта:
      • Тип подключения — Кластер Managed Service for Apache Kafka.

        Выберите кластер-источник из списка и укажите настройки подключения к нему.

      • Расширенные настройки → Правила конвертации.

        • Формат данных – JSON.
        • Схема данных – Вы можете задать схему двумя способами:
          • Список полей.

            Задайте список полей топика вручную:

            Имя Тип Ключ
            device_id STRING Да
            datetime STRING
            latitude DOUBLE
            longitude DOUBLE
            altitude DOUBLE
            speed DOUBLE
            battery_voltage DOUBLE
            cabin_temperature UINT16
            fuel_level UINT16
          • JSON спецификация.

            Создайте и загрузите файл схемы данных в формате JSON json_schema.json:

            json_schema.json
            [
                {
                    "name": "device_id",
                    "type": "string",
                    "key": true
                },
                {
                    "name": "datetime",
                    "type": "string"
                },
                {
                    "name": "latitude",
                    "type": "double"
                },
                {
                    "name": "longitude",
                    "type": "double"
                },
                {
                    "name": "altitude",
                    "type": "double"
                },
                {
                    "name": "speed",
                    "type": "double"
                },
                {
                    "name": "battery_voltage",
                    "type": "double"
                },
                {
                    "name": "cabin_temperature",
                    "type": "uint16"
                },
                {
                    "name": "fuel_level",
                    "type": "uint16"
                }
            ]
            
  3. Создайте трансфер:

    Вручную
    Terraform
    1. Создайте трансфер типа Репликация, использующий созданные эндпоинты.
    2. Активируйте его.
    1. Укажите в файле data-transfer-mkf-ydb.tf переменные:

      • source_endpoint_id — значение идентификатора эндпоинта для источника;
      • target_endpoint_id — значение идентификатора эндпоинта для приемника;
      • transfer_enabled – значение 1 для создания трансфера.
    2. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    3. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

      Трансфер активируется автоматически после создания.

Проверьте работоспособность трансфераПроверьте работоспособность трансфера

  1. Дождитесь перехода трансфера в статус Реплицируется.

  2. Убедитесь, что в базу данных Managed Service for YDB переносятся данные из топика кластера-источника Managed Service for Apache Kafka®:

    1. Создайте файл sample.json с тестовыми данными:

      {
          "device_id": "iv9a94th6rzt********",
          "datetime": "2020-06-05 17:27:00",
          "latitude": 55.70329032,
          "longitude": 37.65472196,
          "altitude": 427.5,
          "speed": 0,
          "battery_voltage": 23.5,
          "cabin_temperature": 17,
          "fuel_level": null
      }
      
      {
          "device_id": "rhibbh3y08qm********",
          "datetime": "2020-06-06 09:49:54",
          "latitude": 55.71294467,
          "longitude": 37.66542005,
          "altitude": 429.13,
          "speed": 55.5,
          "battery_voltage": null,
          "cabin_temperature": 18,
          "fuel_level": 32
      }
      
      {
          "device_id": "iv9a94th6rzt********",
          "datetime": "2020-06-07 15:00:10",
          "latitude": 55.70985913,
          "longitude": 37.62141918,
          "altitude": 417.0,
          "speed": 15.7,
          "battery_voltage": 10.3,
          "cabin_temperature": 17,
          "fuel_level": null
      }
      
    2. Отправьте данные из файла sample.json в топик sensors Managed Service for Apache Kafka® с помощью утилит jq и kafkacat:

      jq -rc . sample.json | kafkacat -P \
         -b <FQDN_хоста-брокера>:9091 \
         -t sensors \
         -k key \
         -X security.protocol=SASL_SSL \
         -X sasl.mechanisms=SCRAM-SHA-512 \
         -X sasl.username="<имя_пользователя_в_кластере-источнике>" \
         -X sasl.password="<пароль_пользователя_в_кластере-источнике>" \
         -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
      

      Данные отправляются от имени созданного пользователя. Подробнее о настройке SSL-сертификата и работе с kafkacat см. в разделе Подключение к кластеру Apache Kafka® из приложений.

    3. Убедитесь, что в базу данных Managed Service for YDB перенеслись данные из кластера-источника Managed Service for Apache Kafka®:

      Консоль управления
      YDB CLI
      1. В консоли управления выберите каталог, в котором находится нужная база данных.
      2. В списке сервисов выберите Managed Service for YDB.
      3. Выберите базу из списка.
      4. Перейдите на вкладку Навигация.
      5. Проверьте, что база данных Managed Service for YDB содержит таблицу sensors с тестовыми данными из топика.
      1. Подключитесь к базе данных Managed Service for YDB.

      2. Проверьте, что база данных содержит таблицу sensors с тестовыми данными из топика:

        ydb table query execute \
          --query "SELECT * \
          FROM sensors"
        

Удалите созданные ресурсыУдалите созданные ресурсы

Примечание

Перед тем как удалить созданные ресурсы, деактивируйте трансфер.

Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:

  1. Удалите трансфер.
  2. Удалите эндпоинты для источника и приемника.
  3. Если при создании эндпоинта для приемника вы создавали сервисный аккаунт, удалите его.

Остальные ресурсы удалите в зависимости от способа их создания:

Вручную
Terraform
  1. Удалите кластер Managed Service for Apache Kafka®.
  2. Удалите базу данных Managed Service for YDB.
  1. В терминале перейдите в директорию с планом инфраструктуры.

    Важно

    Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.

  2. Удалите ресурсы:

    1. Выполните команду:

      terraform destroy
      
    2. Подтвердите удаление ресурсов и дождитесь завершения операции.

    Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.

Была ли статья полезна?

Предыдущая
Apache Kafka® в OpenSearch
Следующая
Apache Kafka® в YDS
Проект Яндекса
© 2025 ООО «Яндекс.Облако»