Поставка данных из очереди Apache Kafka® в OpenSearch
В кластер Managed Service for OpenSearch можно в реальном времени поставлять данные из топиков Apache Kafka®.
Чтобы запустить поставку данных:
- Подготовьте тестовые данные.
- Настройте кластер-приемник.
- Подготовьте и активируйте трансфер.
- Проверьте работоспособность трансфера.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
-
Подготовьте инфраструктуру поставки данных:
ВручнуюTerraform-
Создайте кластер-источник Managed Service for Apache Kafka® любой подходящей конфигурации. Для подключения к кластеру с локальной машины пользователя, а не из облачной сети Yandex Cloud, включите публичный доступ к кластеру при его создании.
-
Создайте в кластере-источнике топик с именем
sensors
. -
Создайте в кластере-источнике пользователя с именем
mkf-user
и правами доступаACCESS_ROLE_PRODUCER
иACCESS_ROLE_CONSUMER
к созданному топику. -
Создайте кластер-приемник Managed Service for OpenSearch любой подходящей конфигурации со следующими настройками:
- В той же зоне доступности, что и кластер-источник.
- С публичным доступом к хостам с ролью
DATA
.
-
Для подключения к кластерам с локальной машины пользователя, настройте группы безопасности:
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации data-transfer-mkf-mos.tf
.В этом файле описаны:
- сеть;
- подсеть;
- группа безопасности и правила, необходимые для подключения к кластерам Managed Service for Apache Kafka® и Managed Service for OpenSearch;
- кластер-источник Managed Service for Apache Kafka®;
- топик Apache Kafka® с именем
sensors
; - пользователь Apache Kafka®
mkf-user
с правами доступаACCESS_ROLE_PRODUCER
,ACCESS_ROLE_CONSUMER
к топикуsensors
; - кластер-приемник Managed Service for OpenSearch;
- трансфер.
-
Укажите в файле
data-transfer-mkf-mos.tf
переменные:kf_version
— версия Apache Kafka® в кластере-источнике;kf_user_password
— пароль пользователяmkf-user
;os_version
— версия OpenSearch в кластере-приемнике;os_user_password
— пароль пользователяadmin
;transfer_enabled
— значение0
, чтобы не создавать трансфер до создания эндпоинтов вручную.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
-
-
Установите утилиты:
-
kafkacat
— для чтения и записи данных в топики Apache Kafka®.sudo apt update && sudo apt install --yes kafkacat
Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.
-
jq
— для потоковой обработки JSON-файлов.sudo apt update && sudo apt-get install --yes jq
-
Подготовьте тестовые данные
Пусть в качестве сообщения в топик Apache Kafka® sensors
кластера-источника поступают данные от сенсоров автомобиля в формате JSON.
Создайте локально файл sample.json
с тестовыми данными:
sample.json
{
"device_id": "iv9a94th6rzt********",
"datetime": "2020-06-05 17:27:00",
"latitude": 55.70329032,
"longitude": 37.65472196,
"altitude": 427.5,
"speed": 0,
"battery_voltage": 23.5,
"cabin_temperature": 17,
"fuel_level": null
}
{
"device_id": "rhibbh3y08qm********",
"datetime": "2020-06-06 09:49:54",
"latitude": 55.71294467,
"longitude": 37.66542005,
"altitude": 429.13,
"speed": 55.5,
"battery_voltage": null,
"cabin_temperature": 18,
"fuel_level": 32
}
{
"device_id": "iv9a94th6rzt********",
"datetime": "2020-06-07 15:00:10",
"latitude": 55.70985913,
"longitude": 37.62141918,
"altitude": 417.0,
"speed": 15.7,
"battery_voltage": 10.3,
"cabin_temperature": 17,
"fuel_level": null
}
Настройте кластер-приемник
Совет
Вы можете поставлять данные в кластер Managed Service for OpenSearch от имени пользователя admin
, имеющего роль superuser
, но безопаснее для каждой задачи создавать отдельных пользователей с ограниченными привилегиями. Подробнее см. в разделе Управление пользователями OpenSearch.
-
Создайте роль
с привилегиямиcreate_index
иwrite
для всех индексов (*
). -
Создайте пользователя и назначьте ему эту роль.
Подготовьте и активируйте трансфер
-
Создайте эндпоинт для источника
Apache Kafka®
:Параметры эндпоинта:
-
Настройки подключения:
-
Тип подключения —
Кластер Managed Service for Apache Kafka
.-
Кластер Managed Service for Apache Kafka — выберите кластер-источник из списка.
-
Аутентификация — SASL.
- Имя пользователя —
mkf-user
. - Пароль — укажите пароль пользователя.
- Имя пользователя —
-
-
Полное имя топика —
sensors
.
-
-
Расширенные настройки → Правила конвертации:
- Правила конвертации —
json
.-
Схема данных —
JSON-спецификация
.Вставьте схему данных в формате JSON:
json
[ { "name": "device_id", "type": "utf8", "key": true }, { "name": "datetime", "type": "utf8" }, { "name": "latitude", "type": "double" }, { "name": "longitude", "type": "double" }, { "name": "altitude", "type": "double" }, { "name": "speed", "type": "double" }, { "name": "battery_voltage", "type": "double" }, { "name": "cabin_temperature", "type": "uint16" }, { "name": "fuel_level", "type": "uint16" } ]
-
- Правила конвертации —
-
-
Создайте эндпоинт для приемника
OpenSearch
:Параметры эндпоинта → Настройки подключения:
-
Тип подключения —
Кластер Managed Service for OpenSearch
.- Кластер Managed Service for OpenSearch — выберите кластер-приемник из списка.
-
Пользователь — укажите имя пользователя.
-
Пароль — укажите пароль пользователя.
-
-
Создайте трансфер:
ВручнуюTerraform- Создайте трансфер типа Репликация, использующий созданные эндпоинты.
- Активируйте трансфер и дождитесь его перехода в статус Реплицируется.
-
Укажите в файле
data-transfer-mkf-mos.tf
переменные:source_endpoint_id
— идентификатор эндпоинта для источника;target_endpoint_id
— идентификатор эндпоинта для приемника;transfer_enabled
— значение1
для создания трансфера.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
-
-
Трансфер активируется автоматически. Дождитесь его перехода в статус Реплицируется.
Проверьте работоспособность трансфера
Убедитесь, что в кластер Managed Service for OpenSearch переносятся данные из топика кластера-источника Managed Service for Apache Kafka®:
-
Отправьте данные из файла
sample.json
в топикsensors
Managed Service for Apache Kafka® с помощью утилитjq
иkafkacat
:jq -rc . sample.json | kafkacat -P \ -b <FQDN_хоста-брокера>:9091 \ -t sensors \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username="mkf-user" \ -X sasl.password="<пароль_пользователя_в_кластере-источнике>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
Подробнее о настройке SSL-сертификата и работе с
kafkacat
см. в разделе Подключение к кластеру Apache Kafka® из приложений. -
Проверьте, что индекс
sensors
кластера Managed Service for OpenSearch содержит отправленные данные:BashOpenSearch DashboardsВыполните команду:
curl \ --user <имя_пользователя_в_кластере-приемнике>:<пароль_пользователя_в_кластере-приемнике> \ --cacert ~/.opensearch/root.crt \ --header 'Content-Type: application/json' \ --request GET 'https://<идентификатор_хоста_OpenSearch_с_ролью_DATA>.rw.mdb.yandexcloud.net:9200/sensors/_search?pretty'
- Подключитесь к кластеру-приемнику с помощью OpenSearch Dashboards.
- Выберите общий тенант
Global
. - Откройте панель управления, нажав на значок
. - В разделе OpenSearch Dashboards выберите Discover.
- В поле CHANGE INDEX PATTERN выберите индекс
sensors
.
Удалите созданные ресурсы
Примечание
Перед тем как удалить созданные ресурсы, деактивируйте трансфер.
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
- Удалите трансфер.
- Удалите эндпоинты для источника и приемника.
Остальные ресурсы удалите в зависимости от способа их создания:
-
В терминале перейдите в директорию с планом инфраструктуры.
-
Удалите конфигурационный файл
data-transfer-mkf-mos.tf
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Все ресурсы, которые были описаны в конфигурационном файле
data-transfer-mkf-mos.tf
, будут удалены. -