Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Data Transfer
  • Доступные трансферы
  • Начало работы
    • Все руководства
      • Apache Kafka® в ClickHouse®
      • Apache Kafka® в PostgreSQL
      • Apache Kafka® в Greenplum®
      • Apache Kafka® в MongoDB
      • Apache Kafka® в MySQL®
      • Apache Kafka® в OpenSearch
      • Apache Kafka® в YDB
      • Apache Kafka® в YDS
      • YDS в Apache Kafka®
      • YDS в ClickHouse®
      • YDS в Object Storage
      • YDS в YDB
      • Ввод данных в системы хранения
  • Решение проблем
  • Управление доступом
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте тестовые данные
  • Настройте кластер-приемник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Поставка данных из очередей
  3. Apache Kafka® в OpenSearch

Поставка данных из очереди Apache Kafka® в OpenSearch

Статья создана
Yandex Cloud
Обновлена 8 апреля 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте тестовые данные
  • Настройте кластер-приемник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы

В кластер Managed Service for OpenSearch можно в реальном времени поставлять данные из топиков Apache Kafka®.

Чтобы запустить поставку данных:

  1. Подготовьте тестовые данные.
  2. Настройте кластер-приемник.
  3. Подготовьте и активируйте трансфер.
  4. Проверьте работоспособность трансфера.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).

  • Плата за каждый трансфер: использование вычислительных ресурсов и количество переданных строк данных (см. тарифы Data Transfer).

  • Плата за кластер Managed Service for OpenSearch: использование вычислительных ресурсов, выделенных хостам (в том числе хостам с ролью MANAGER), и дискового пространства (см. тарифы Managed Service for OpenSearch).

  • Плата за использование публичных IP-адресов:

    • для хостов кластера Managed Service for OpenSearch;
    • для хостов кластера Managed Service for Apache Kafka®, если для них включен публичный доступ.

    Подробнее о тарифах Virtual Private Cloud.

Перед началом работыПеред началом работы

  1. Подготовьте инфраструктуру поставки данных:

    Вручную
    Terraform
    1. Создайте кластер-источник Managed Service for Apache Kafka® любой подходящей конфигурации. Для подключения к кластеру с локальной машины пользователя, а не из облачной сети Yandex Cloud, включите публичный доступ к кластеру при его создании.

    2. Создайте в кластере-источнике топик с именем sensors.

    3. Создайте в кластере-источнике пользователя с именем mkf-user и правами доступа ACCESS_ROLE_PRODUCER и ACCESS_ROLE_CONSUMER к созданному топику.

    4. Создайте кластер-приемник Managed Service for OpenSearch любой подходящей конфигурации со следующими настройками:

      • В той же зоне доступности, что и кластер-источник.
      • С публичным доступом к хостам с ролью DATA.
    5. Для подключения к кластерам с локальной машины пользователя, настройте группы безопасности:

      • Managed Service for Apache Kafka®.
      • Managed Service for OpenSearch.
    1. Если у вас еще нет Terraform, установите его.

    2. Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его.

    3. Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.

    4. Скачайте в ту же рабочую директорию файл конфигурации data-transfer-mkf-mos.tf.

      В этом файле описаны:

      • сеть;
      • подсеть;
      • группа безопасности и правила, необходимые для подключения к кластерам Managed Service for Apache Kafka® и Managed Service for OpenSearch;
      • кластер-источник Managed Service for Apache Kafka®;
      • топик Apache Kafka® с именем sensors;
      • пользователь Apache Kafka® mkf-user с правами доступа ACCESS_ROLE_PRODUCER, ACCESS_ROLE_CONSUMER к топику sensors;
      • кластер-приемник Managed Service for OpenSearch;
      • трансфер.
    5. Укажите в файле data-transfer-mkf-mos.tf переменные:

      • kf_version — версия Apache Kafka® в кластере-источнике;
      • kf_user_password — пароль пользователя mkf-user;
      • os_version — версия OpenSearch в кластере-приемнике;
      • os_user_password — пароль пользователя admin;
      • transfer_enabled — значение 0, чтобы не создавать трансфер до создания эндпоинтов вручную.
    6. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    7. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

  2. Установите утилиты:

    • kafkacat — для чтения и записи данных в топики Apache Kafka®.

      sudo apt update && sudo apt install --yes kafkacat
      

      Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.

    • jq — для потоковой обработки JSON-файлов.

      sudo apt update && sudo apt-get install --yes jq
      

Подготовьте тестовые данныеПодготовьте тестовые данные

Пусть в качестве сообщения в топик Apache Kafka® sensors кластера-источника поступают данные от сенсоров автомобиля в формате JSON.

Создайте локально файл sample.json с тестовыми данными:

sample.json
{
    "device_id": "iv9a94th6rzt********",
    "datetime": "2020-06-05 17:27:00",
    "latitude": 55.70329032,
    "longitude": 37.65472196,
    "altitude": 427.5,
    "speed": 0,
    "battery_voltage": 23.5,
    "cabin_temperature": 17,
    "fuel_level": null
}
{
    "device_id": "rhibbh3y08qm********",
    "datetime": "2020-06-06 09:49:54",
    "latitude": 55.71294467,
    "longitude": 37.66542005,
    "altitude": 429.13,
    "speed": 55.5,
    "battery_voltage": null,
    "cabin_temperature": 18,
    "fuel_level": 32
}
{
    "device_id": "iv9a94th6rzt********",
    "datetime": "2020-06-07 15:00:10",
    "latitude": 55.70985913,
    "longitude": 37.62141918,
    "altitude": 417.0,
    "speed": 15.7,
    "battery_voltage": 10.3,
    "cabin_temperature": 17,
    "fuel_level": null
}

Настройте кластер-приемникНастройте кластер-приемник

Совет

Вы можете поставлять данные в кластер Managed Service for OpenSearch от имени пользователя admin, имеющего роль superuser, но безопаснее для каждой задачи создавать отдельных пользователей с ограниченными привилегиями. Подробнее см. в разделе Управление пользователями OpenSearch.

  1. Создайте роль с привилегиями create_index и write для всех индексов (*).

  2. Создайте пользователя и назначьте ему эту роль.

Подготовьте и активируйте трансферПодготовьте и активируйте трансфер

  1. Создайте эндпоинт для источника Apache Kafka®:

    Параметры эндпоинта:

    • Настройки подключения:

      • Тип подключения — Кластер Managed Service for Apache Kafka.

        • Кластер Managed Service for Apache Kafka — выберите кластер-источник из списка.

        • Аутентификация — SASL.

          • Имя пользователя — mkf-user.
          • Пароль — укажите пароль пользователя.
      • Полное имя топика — sensors.

    • Расширенные настройки → Правила конвертации:

      • Правила конвертации — json.
        • Схема данных — JSON-спецификация.

          Вставьте схему данных в формате JSON:

          json
          [
              {
                  "name": "device_id",
                  "type": "utf8",
                  "key": true
              },
              {
                  "name": "datetime",
                  "type": "utf8"
              },
              {
                  "name": "latitude",
                  "type": "double"
              },
              {
                  "name": "longitude",
                  "type": "double"
              },
              {
                  "name": "altitude",
                  "type": "double"
              },
              {
                  "name": "speed",
                  "type": "double"
              },
              {
                  "name": "battery_voltage",
                  "type": "double"
              },
              {
                  "name": "cabin_temperature",
                  "type": "uint16"
              },
              {
                  "name": "fuel_level",
                  "type": "uint16"
              }
          ]
          
  2. Создайте эндпоинт для приемника OpenSearch:

    Параметры эндпоинта → Настройки подключения:

    • Тип подключения — Кластер Managed Service for OpenSearch.

      • Кластер Managed Service for OpenSearch — выберите кластер-приемник из списка.
    • Пользователь — укажите имя пользователя.

    • Пароль — укажите пароль пользователя.

  3. Создайте трансфер:

    Вручную
    Terraform
    1. Создайте трансфер типа Репликация, использующий созданные эндпоинты.
    2. Активируйте трансфер и дождитесь его перехода в статус Реплицируется.
    1. Укажите в файле data-transfer-mkf-mos.tf переменные:

      • source_endpoint_id — идентификатор эндпоинта для источника;
      • target_endpoint_id — идентификатор эндпоинта для приемника;
      • transfer_enabled — значение 1 для создания трансфера.
    2. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    3. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

    4. Трансфер активируется автоматически. Дождитесь его перехода в статус Реплицируется.

Проверьте работоспособность трансфераПроверьте работоспособность трансфера

Убедитесь, что в кластер Managed Service for OpenSearch переносятся данные из топика кластера-источника Managed Service for Apache Kafka®:

  1. Отправьте данные из файла sample.json в топик sensors Managed Service for Apache Kafka® с помощью утилит jq и kafkacat:

    jq -rc . sample.json | kafkacat -P \
       -b <FQDN_хоста-брокера>:9091 \
       -t sensors \
       -k key \
       -X security.protocol=SASL_SSL \
       -X sasl.mechanisms=SCRAM-SHA-512 \
       -X sasl.username="mkf-user" \
       -X sasl.password="<пароль_пользователя_в_кластере-источнике>" \
       -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
    

    Подробнее о настройке SSL-сертификата и работе с kafkacat см. в разделе Подключение к кластеру Apache Kafka® из приложений.

  2. Проверьте, что индекс sensors кластера Managed Service for OpenSearch содержит отправленные данные:

    Bash
    OpenSearch Dashboards

    Выполните команду:

    curl \
        --user <имя_пользователя_в_кластере-приемнике>:<пароль_пользователя_в_кластере-приемнике> \
        --cacert ~/.opensearch/root.crt \
        --header 'Content-Type: application/json' \
        --request GET 'https://<идентификатор_хоста_OpenSearch_с_ролью_DATA>.rw.mdb.yandexcloud.net:9200/sensors/_search?pretty'
    
    1. Подключитесь к кластеру-приемнику с помощью OpenSearch Dashboards.
    2. Выберите общий тенант Global.
    3. Откройте панель управления, нажав на значок .
    4. В разделе OpenSearch Dashboards выберите Discover.
    5. В поле CHANGE INDEX PATTERN выберите индекс sensors.

Удалите созданные ресурсыУдалите созданные ресурсы

Примечание

Перед тем как удалить созданные ресурсы, деактивируйте трансфер.

Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:

  1. Удалите трансфер.
  2. Удалите эндпоинты для источника и приемника.

Остальные ресурсы удалите в зависимости от способа их создания:

Вручную
С помощью Terraform
  1. Удалите кластер Managed Service for OpenSearch.
  2. Удалите кластер Managed Service for Apache Kafka®.
  1. В терминале перейдите в директорию с планом инфраструктуры.

    Важно

    Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.

  2. Удалите ресурсы:

    1. Выполните команду:

      terraform destroy
      
    2. Подтвердите удаление ресурсов и дождитесь завершения операции.

    Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.

Была ли статья полезна?

Предыдущая
Apache Kafka® в MySQL®
Следующая
Apache Kafka® в YDB
Проект Яндекса
© 2025 ООО «Яндекс.Облако»