Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Data Transfer
  • Доступные трансферы
  • Начало работы
    • Все руководства
      • Apache Kafka® в ClickHouse®
      • Apache Kafka® в PostgreSQL
      • Apache Kafka® в Greenplum®
      • Apache Kafka® в MongoDB
      • Apache Kafka® в MySQL®
      • Apache Kafka® в OpenSearch
      • Apache Kafka® в YDB
      • Apache Kafka® в YDS
      • YDS в Apache Kafka®
      • YDS в ClickHouse®
      • YDS в Object Storage
      • YDS в YDB
      • Ввод данных в системы хранения
  • Решение проблем
  • Управление доступом
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте тестовые данные
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Поставка данных из очередей
  3. Apache Kafka® в MongoDB

Поставка данных из очереди Apache Kafka® в MongoDB

Статья создана
Yandex Cloud
Обновлена 8 апреля 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте тестовые данные
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы

В кластер Managed Service for MongoDB можно в реальном времени поставлять данные из топиков Apache Kafka®.

Чтобы запустить поставку данных:

  1. Подготовьте тестовые данные.
  2. Подготовьте и активируйте трансфер.
  3. Проверьте работоспособность трансфера.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за кластер Managed Service for MongoDB: использование вычислительных ресурсов, выделенных хостам, и дискового пространства (см. тарифы MongoDB).
  • Плата за использование публичных IP-адресов, если для хостов кластеров включен публичный доступ (см. тарифы Virtual Private Cloud).
  • Плата за каждый трансфер: использование вычислительных ресурсов и количество переданных строк данных (см. тарифы Data Transfer).

Перед началом работыПеред началом работы

  1. Подготовьте инфраструктуру поставки данных:

    Вручную
    Terraform
    1. Создайте кластер-источник Managed Service for Apache Kafka® любой подходящей конфигурации. Для подключения к кластеру с локальной машины пользователя, а не из облачной сети Yandex Cloud, включите публичный доступ к кластеру при его создании.

    2. Создайте в кластере-источнике топик с именем sensors.

    3. Создайте в кластере-источнике пользователя с именем mkf-user и правами доступа ACCESS_ROLE_PRODUCER и ACCESS_ROLE_CONSUMER к созданному топику.

    4. Создайте кластер-приемник Managed Service for MongoDB любой подходящей конфигурации со следующими настройками:

      • Имя базы данных — db1.
      • Имя пользователя — mmg-user.
      • В той же зоне доступности, что и кластер-источник.
      • Для подключения к кластеру с локальной машины пользователя, а не из облачной сети Yandex Cloud, включите публичный доступ к хостам кластера.
    5. Для подключения к кластерам с локальной машины пользователя, настройте группы безопасности:

      • Managed Service for Apache Kafka®.
      • Managed Service for MongoDB.
    1. Если у вас еще нет Terraform, установите его.

    2. Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его.

    3. Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.

    4. Скачайте в ту же рабочую директорию файл конфигурации data-transfer-mkf-mmg.tf.

      В этом файле описаны:

      • сеть;
      • подсеть;
      • группа безопасности и правила, необходимые для подключения к кластерам Managed Service for Apache Kafka® и Managed Service for MongoDB;
      • кластер-источник Managed Service for Apache Kafka®;
      • топик Apache Kafka® с именем sensors;
      • пользователь Apache Kafka® mkf-user с правами доступа ACCESS_ROLE_PRODUCER, ACCESS_ROLE_CONSUMER к топику sensors;
      • кластер-приемник Managed Service for MongoDB;
      • база данных MongoDB db1;
      • пользователь MongoDB mmg-user с правами доступа readWrite к базе данных db1;
      • трансфер.
    5. Укажите в файле data-transfer-mkf-mmg.tf переменные:

      • source_kf_version — версия Apache Kafka® в кластере-источнике;
      • source_user_password — пароль пользователя mkf-user в кластере-источнике;
      • target_mg_version — версия MongoDB в кластере-приемнике;
      • target_user_password — пароль пользователя mmg-user в кластере-приемнике;
      • transfer_enabled — значение 0, чтобы не создавать трансфер до создания эндпоинтов вручную.
    6. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    7. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

  2. Установите утилиты:

    • kafkacat — для чтения и записи данных в топики Apache Kafka®.

      sudo apt update && sudo apt install --yes kafkacat
      

      Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.

    • jq — для потоковой обработки JSON-файлов.

      sudo apt update && sudo apt-get install --yes jq
      

Подготовьте тестовые данныеПодготовьте тестовые данные

Пусть в качестве сообщения в топик Apache Kafka® sensors кластера-источника поступают данные от сенсоров автомобиля в формате JSON.

Создайте локально файл sample.json с тестовыми данными:

sample.json
{
    "device_id": "iv9a94th6rzt********",
    "datetime": "2020-06-05 17:27:00",
    "latitude": 55.70329032,
    "longitude": 37.65472196,
    "altitude": 427.5,
    "speed": 0,
    "battery_voltage": 23.5,
    "cabin_temperature": 17,
    "fuel_level": null
}
{
    "device_id": "rhibbh3y08qm********",
    "datetime": "2020-06-06 09:49:54",
    "latitude": 55.71294467,
    "longitude": 37.66542005,
    "altitude": 429.13,
    "speed": 55.5,
    "battery_voltage": null,
    "cabin_temperature": 18,
    "fuel_level": 32
}
{
    "device_id": "iv9a94th6rzt********",
    "datetime": "2020-06-07 15:00:10",
    "latitude": 55.70985913,
    "longitude": 37.62141918,
    "altitude": 417.0,
    "speed": 15.7,
    "battery_voltage": 10.3,
    "cabin_temperature": 17,
    "fuel_level": null
}

Подготовьте и активируйте трансферПодготовьте и активируйте трансфер

  1. Создайте эндпоинт для источника Apache Kafka®:

    Параметры эндпоинта:

    • Настройки подключения:

      • Тип подключения — Кластер Managed Service for Apache Kafka.

        • Кластер Managed Service for Apache Kafka — выберите кластер-источник из списка.

        • Аутентификация — SASL.

          • Имя пользователя — mkf-user.
          • Пароль — укажите пароль пользователя.
      • Полное имя топика — sensors.

    • Расширенные настройки → Правила конвертации:

      • Правила конвертации — json.
        • Схема данных — JSON-спецификация.

          Вставьте схему данных в формате JSON:

          json
          [
              {
                  "name": "device_id",
                  "type": "utf8",
                  "key": true
              },
              {
                  "name": "datetime",
                  "type": "utf8"
              },
              {
                  "name": "latitude",
                  "type": "double"
              },
              {
                  "name": "longitude",
                  "type": "double"
              },
              {
                  "name": "altitude",
                  "type": "double"
              },
              {
                  "name": "speed",
                  "type": "double"
              },
              {
                  "name": "battery_voltage",
                  "type": "double"
              },
              {
                  "name": "cabin_temperature",
                  "type": "uint16"
              },
              {
                  "name": "fuel_level",
                  "type": "uint16"
              }
          ]
          
  2. Создайте эндпоинт для приемника MongoDB:

    Параметры эндпоинта:

    • Настройки подключения:

      • Тип подключения — Кластер Managed Service for MongoDB.

        • Кластер Managed Service for MongoDB — выберите кластер-приемник из списка.
      • Источник аутентификации — db1.

      • Пользователь — mmg-user.

      • Пароль — укажите пароль пользователя.

    • База данных — db1.

  3. Создайте трансфер:

    Вручную
    Terraform
    1. Создайте трансфер типа Репликация, использующий созданные эндпоинты.
    2. Активируйте трансфер и дождитесь его перехода в статус Реплицируется.
    1. Укажите в файле data-transfer-mkf-mmg.tf переменные:

      • source_endpoint_id — значение идентификатора эндпоинта для источника;
      • target_endpoint_id — значение идентификатора эндпоинта для приемника;
      • transfer_enabled – значение 1 для создания трансфера.
    2. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    3. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

    4. Трансфер активируется автоматически. Дождитесь его перехода в статус Реплицируется.

Проверьте работоспособность трансфераПроверьте работоспособность трансфера

Убедитесь, что в кластер Managed Service for MongoDB переносятся данные из топика кластера-источника Managed Service for Apache Kafka®:

  1. Отправьте данные из файла sample.json в топик sensors Managed Service for Apache Kafka® с помощью утилит jq и kafkacat:

    jq -rc . sample.json | kafkacat -P \
       -b <FQDN_хоста-брокера>:9091 \
       -t sensors \
       -k key \
       -X security.protocol=SASL_SSL \
       -X sasl.mechanisms=SCRAM-SHA-512 \
       -X sasl.username="mkf-user" \
       -X sasl.password="<пароль_пользователя_в_кластере-источнике>" \
       -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
    

    Подробнее о настройке SSL-сертификата и работе с kafkacat см. в разделе Подключение к кластеру Apache Kafka® из приложений.

  2. Проверьте, что коллекция sensors кластера Managed Service for MongoDB содержит отправленные данные:

    1. Подключитесь к кластеру Managed Service for MongoDB.

    2. Получите содержимое коллекции sensors с помощью запроса:

      db.sensors.find()
      

Удалите созданные ресурсыУдалите созданные ресурсы

Примечание

Перед тем как удалить созданные ресурсы, деактивируйте трансфер.

Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:

  1. Удалите трансфер.
  2. Удалите эндпоинты для источника и приемника.

Остальные ресурсы удалите в зависимости от способа их создания:

Вручную
Terraform
  • Удалите кластер Managed Service for Apache Kafka®.
  • Удалите кластер Managed Service for MongoDB.
  1. В терминале перейдите в директорию с планом инфраструктуры.

    Важно

    Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.

  2. Удалите ресурсы:

    1. Выполните команду:

      terraform destroy
      
    2. Подтвердите удаление ресурсов и дождитесь завершения операции.

    Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.

Была ли статья полезна?

Предыдущая
Apache Kafka® в Greenplum®
Следующая
Apache Kafka® в MySQL®
Проект Яндекса
© 2025 ООО «Яндекс.Облако»