Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®
Примечание
Managed Service for Apache Kafka® имеет встроенную поддержку некоторых коннекторов и позволяет управлять ими. Список доступных коннекторов приведен в разделе Коннекторы. Если вам нужны другие коннекторы, или вы хотите управлять работой Kafka Connect вручную, используйте информацию из этого руководства.
Инструмент Kafka Connect предназначен для перемещения данных между Apache Kafka® и другими хранилищами данных.
Работа с данными в Kafka Connect осуществляется с помощью процессов-исполнителей (workers). Инструмент может быть развернут как в виде распределенной системы (distributed mode) с несколькими процессами-исполнителями, так и в виде отдельной инсталляции из одного процесса-исполнителя (standalone mode).
Непосредственно перемещение данных выполняется с помощью коннекторов, которые запускаются в отдельных потоках процесса-исполнителя.
Подробнее о Kafka Connect см. в документации Apache Kafka®
Далее будет продемонстрировано, как настроить Kafka Connect для взаимодействия с кластером Managed Service for Apache Kafka®. Инструмент будет развернут на виртуальной машине Yandex Cloud в виде отдельной инсталляции. Для защиты подключения будет использоваться SSL-шифрование.
Также будет настроен простой коннектор FileStreamSource
Примечание
Вы можете использовать любой другой коннектор Kafka Connect для взаимодействия с кластером Managed Service for Apache Kafka®.
Чтобы настроить Kafka Connect для работы с кластером Managed Service for Apache Kafka®:
- Настройте виртуальную машину.
- Подготовьте тестовые данные.
- Настройте Kafka Connect.
- Запустите Kafka Connect и проверьте его работу.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
-
Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации.
-
Создайте топик с именем
messages
для обмена сообщениями между Kafka Connect и кластером Managed Service for Apache Kafka®. -
Создайте пользователя с именем
user
и выдайте ему права на топикmessages
:ACCESS_ROLE_CONSUMER
,ACCESS_ROLE_PRODUCER
.
-
В той же сети, что и кластер Managed Service for Apache Kafka®, создайте виртуальную машину с Ubuntu 20.04 и публичным IP-адресом.
-
Если вы используете группы безопасности, настройте их так, чтобы был разрешен весь необходимый трафик между кластером Managed Service for Apache Kafka® и виртуальной машиной.
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации kafka-connect.tf
.В этом файле описаны:
-
сеть;
-
подсеть;
-
группа безопасности по умолчанию и правила, необходимые для подключения к кластеру и виртуальной машине из интернета;
-
виртуальная машина с Ubuntu 20.04;
-
кластер Managed Service for Apache Kafka® с необходимыми настройками.
-
-
Укажите в файле пароль для пользователя
user
, который будет использоваться для доступа к кластеру Managed Service for Apache Kafka®, а также имя пользователя и публичную часть SSH-ключа для виртуальной машины. Если на виртуальную машину будет установлена Ubuntu 20.04 из рекомендованного списка образов, то указанное здесь имя пользователя игнорируется. В таком случае при подключении используйте имя пользователяubuntu
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
Настройте виртуальную машину
-
Установите JDK и утилиту kcat
:sudo apt update && \ sudo apt install default-jdk --yes && \ sudo apt install kafkacat
-
Скачайте
и распакуйте архив с Apache Kafka®:wget https://downloads.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz && tar -xvf kafka_2.12-3.1.0.tgz --strip 1 --directory /opt/kafka/
В данном примере используется Apache Kafka® версии
3.1.0
. -
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль не короче 6 символов в параметре
-storepass
для дополнительной защиты хранилища:sudo keytool -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore ssl -storepass <пароль_хранилища_сертификатов> \ --noprompt
-
Создайте каталог с настройками процесса-исполнителя и скопируйте туда хранилище:
sudo mkdir --parents /etc/kafka-connect-worker && \ sudo cp ssl /etc/kafka-connect-worker/client.truststore.jks
Подготовьте тестовые данные
Создайте файл /var/log/sample.json
с тестовыми данными. В этом файле приведены данные от сенсоров нескольких автомобилей в формате JSON:
sample.json
{"device_id":"iv9a94th6rzt********","datetime":"2020-06-05 17:27:00","latitude":55.70329032,"longitude":37.65472196,"altitude":427.5,"speed":0,"battery_voltage":23.5,"cabin_temperature":17,"fuel_level":null}
{"device_id":"rhibbh3y08qm********","datetime":"2020-06-06 09:49:54","latitude":55.71294467,"longitude":37.66542005,"altitude":429.13,"speed":55.5,"battery_voltage":null,"cabin_temperature":18,"fuel_level":32}
{"device_id":"iv9a94th6rzt********","datetime":"2020-06-07 15:00:10","latitude":55.70985913,"longitude":37.62141918,"altitude":417,"speed":15.7,"battery_voltage":10.3,"cabin_temperature":17,"fuel_level":null}
Настройте Kafka Connect
-
Создайте файл настроек процесса-исполнителя
/etc/kafka-connect-worker/worker.properties
:# AdminAPI connect properties bootstrap.servers=<FQDN_хоста-брокера>:9091 sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_SSL ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks ssl.truststore.password=<пароль_к_хранилищу_сертификата> sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<пароль_пользователя_user>"; # Producer connect properties producer.sasl.mechanism=SCRAM-SHA-512 producer.security.protocol=SASL_SSL producer.ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks producer.ssl.truststore.password=<пароль_к_хранилищу_сертификата> producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<пароль_пользователя_user>"; # Worker properties plugin.path=/etc/kafka-connect-worker/plugins key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/etc/kafka-connect-worker/worker.offset
Kafka Connect будет подключаться к кластеру Managed Service for Apache Kafka® от имени пользователя
user
, созданного ранее.FQDN хостов-брокеров можно запросить со списком хостов в кластере.
-
Создайте файл настроек коннектора
/etc/kafka-connect-worker/file-connector.properties
:name=local-file-source connector.class=FileStreamSource tasks.max=1 file=/var/log/sample.json topic=messages
Где:
file
— имя файла, из которого коннектор будет читать данные.topic
— имя топика в кластере Managed Service for Apache Kafka®, куда коннектор будет передавать данные.
Запустите Kafka Connect и проверьте его работу
-
Чтобы отправить тестовые данные в кластер, запустите процесс-исполнитель на виртуальной машине:
cd ~/opt/kafka/bin/ && \ sudo ./connect-standalone.sh \ /etc/kafka-connect-worker/worker.properties \ /etc/kafka-connect-worker/file-connector.properties
-
Подключитесь к кластеру с помощью kcat и получите данные из топика кластера:
kafkacat -C \ -b <FQDN_хоста-брокера>:9091 \ -t messages \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=user \ -X sasl.password="<пароль_учетной_записи_user>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
FQDN хостов-брокеров можно запросить со списком хостов в кластере.
В выводе команды вы увидите содержимое тестового файла
/var/log/sample.json
, переданное на предыдущем шаге.
Удалите созданные ресурсы
Удалите ресурсы, которые вы больше не будете использовать, чтобы за них не списывалась плата:
- Удалите виртуальную машину.
- Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, удалите его.
- Удалите кластер Managed Service for Apache Kafka®.
Чтобы удалить инфраструктуру, созданную с помощью Terraform:
-
В терминале перейдите в директорию с планом инфраструктуры.
-
Удалите конфигурационный файл
kafka-connect.tf
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Все ресурсы, которые были описаны в конфигурационном файле, будут удалены.
-