Поставка данных из очереди Apache Kafka® в PostgreSQL
Вы можете настроить перенос данных из топика Managed Service for Apache Kafka® в Managed Service for PostgreSQL с помощью сервиса Yandex Data Transfer. Для этого:
- Подготовьте тестовые данные.
- Подготовьте и активируйте трансфер.
- Проверьте работоспособность трансфера.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
-
Подготовьте инфраструктуру:
ВручнуюTerraform-
Создайте кластер-источник Managed Service for Apache Kafka® в любой зоне доступности, любой подходящей конфигурации и с публичным доступом.
-
Создайте в кластере-источнике топик с именем
sensors
. -
Создайте в кластере-источнике пользователя с именем
mkf-user
и правами доступа к созданному топикуACCESS_ROLE_PRODUCER
иACCESS_ROLE_CONSUMER
. -
В той же зоне доступности создайте кластер-приемник Managed Service for PostgreSQL любой подходящей конфигурации с именем пользователя-администратора
pg-user
и хостами в публичном доступе. -
Убедитесь, что группы безопасности кластеров настроены правильно и допускают подключение к ним:
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации kafka-postgresql.tf
.В этом файле описаны:
- сети и подсети для размещения кластеров;
- группы безопасности для подключения к кластерам;
- кластер-источник Managed Service for Apache Kafka®;
- кластер-приемник Managed Service for PostgreSQL;
- эндпоинт для приемника;
- трансфер.
-
Укажите в файле
kafka-postgresql.tf
:- Версии Apache Kafka® и PostgreSQL.
- Пароли пользователей Apache Kafka® и PostgreSQL.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
-
-
Установите утилиты:
-
kafkacat
— для чтения и записи данных в топики Apache Kafka®.sudo apt update && sudo apt install --yes kafkacat
Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.
-
jq
— для потоковой обработки JSON-файлов.sudo apt update && sudo apt-get install --yes jq
-
Подготовьте тестовые данные
Пусть в качестве сообщения в топик Apache Kafka® sensors
кластера-источника поступают данные от сенсоров автомобиля в формате JSON.
Создайте локально файл sample.json
с тестовыми данными:
sample.json
{
"device_id": "iv9a94th6rzt********",
"datetime": "2020-06-05 17:27:00",
"latitude": 55.70329032,
"longitude": 37.65472196,
"altitude": 427.5,
"speed": 0,
"battery_voltage": 23.5,
"cabin_temperature": 17,
"fuel_level": null
}
{
"device_id": "rhibbh3y08qm********",
"datetime": "2020-06-06 09:49:54",
"latitude": 55.71294467,
"longitude": 37.66542005,
"altitude": 429.13,
"speed": 55.5,
"battery_voltage": null,
"cabin_temperature": 18,
"fuel_level": 32
}
{
"device_id": "iv9a94th6rzt********",
"datetime": "2020-06-07 15:00:10",
"latitude": 55.70985913,
"longitude": 37.62141918,
"altitude": 417.0,
"speed": 15.7,
"battery_voltage": 10.3,
"cabin_temperature": 17,
"fuel_level": null
}
Подготовьте и активируйте трансфер
-
Создайте эндпоинт-источник типа
Apache Kafka®
и задайте для него:- Полное имя топика —
sensors
. - Правила конвертации типа
json
. В поле Схема данных выберитеJSON-спецификация
и в открывшуюся форму скопируйте следующую спецификацию полей:
sensors-specification
[ { "name": "device_id", "type": "utf8", "key": true }, { "name": "datetime", "type": "utf8" }, { "name": "latitude", "type": "double" }, { "name": "longitude", "type": "double" }, { "name": "altitude", "type": "double" }, { "name": "speed", "type": "double" }, { "name": "battery_voltage", "type": "double" }, { "name": "cabin_temperature", "type": "uint16" }, { "name": "fuel_level", "type": "uint16" } ]
- Полное имя топика —
-
Создайте эндпоинт-приемник и трансфер:
ВручнуюTerraform-
Создайте эндпоинт-приемник типа
PostgreSQL
и укажите в нем параметры подключения к кластеру:- Тип инсталляции —
Кластер Managed Service for PostgreSQL
. - Кластер Managed Service for PostgreSQL —
<имя_кластера-приемника_PostgreSQL>
из выпадающего списка. - База данных —
db1
. - Пользователь —
pg-user
. - Пароль —
<пароль_пользователя>
.
- Тип инсталляции —
-
Создайте трансфер типа Репликация, использующий созданные эндпоинты.
-
Активируйте трансфер и дождитесь его перехода в статус Реплицируется.
-
Укажите в файле
kafka-postgresql.tf
переменные:kf_source_endpoint_id
— значение идентификатора эндпоинта для источника;transfer_enabled
— значение1
для создания эндпоинта-приемника и трансфера.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
-
-
Трансфер активируется автоматически. Дождитесь его перехода в статус Реплицируется.
-
Проверьте работоспособность трансфера
Убедитесь, что в базу данных Managed Service for PostgreSQL переносятся данные из топика кластера-источника Managed Service for Apache Kafka®:
-
Отправьте данные из файла
sample.json
в топикsensors
Managed Service for Apache Kafka® с помощью утилитjq
иkafkacat
:jq -rc . sample.json | kafkacat -P \ -b <FQDN_хоста-брокера>:9091 \ -t sensors \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username="mkf-user" \ -X sasl.password="<пароль_пользователя_в_кластере-источнике>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
Подробнее о настройке SSL-сертификата и работе с
kafkacat
см. в разделе Подключение к кластеру Apache Kafka® из приложений. -
Проверьте, что в базу данных Managed Service for PostgreSQL перенеслись данные из кластера-источника Managed Service for Apache Kafka®:
-
Проверьте, что таблица
sensors
содержит отправленные данные:SELECT * FROM sensors;
Удалите созданные ресурсы
Примечание
Перед тем как удалить созданные ресурсы, деактивируйте трансфер.
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
-
Остальные ресурсы удалите в зависимости от способа их создания:
ВручнуюTerraform-
В терминале перейдите в директорию с планом инфраструктуры.
-
Удалите конфигурационный файл
kafka-postgresql.tf
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Все ресурсы, которые были описаны в конфигурационном файле
kafka-postgresql.tf
, будут удалены. -
-