Подключение к кластеру Apache Kafka® из приложений
В этом разделе представлены настройки для подключения к хостам кластера Managed Service for Apache Kafka® с помощью инструментов командной строки и из Docker-контейнера. О подключении из кода вашего приложения см. Примеры кода.
Вы можете подключаться к хостам кластера Apache Kafka® в публичном доступе только с использованием SSL-сертификата. В примерах ниже предполагается, что сертификат YandexInternalRootCA.crt
расположен в директории:
/usr/local/share/ca-certificates/Yandex/
для Ubuntu;$HOME\.kafka\
для Windows.
Подключение без использования SSL-сертификата поддерживается только для хостов, находящихся не в публичном доступе. В этом случае трафик внутри виртуальной сети при подключении к БД шифроваться не будет.
При необходимости перед подключением настройте группы безопасности кластера.
Примеры для Linux проверялись в следующем окружении:
- Виртуальная машина в Yandex Cloud с Ubuntu 20.04 LTS.
- OpenJDK:
11.0.24
. - Bash:
5.0.16
.
Примеры для Windows проверялись в следующем окружении:
- Виртуальная машина в Yandex Cloud с Windows Server 2019 Datacenter.
- Microsoft OpenJDK:
11.0.11
. - PowerShell:
5.1.17763.1490 Desktop
.
Инструменты командной строки
Примеры кода с заполненным FQDN хоста доступны в консоли управления
kafkacat
Утилита kafkacatkcat
) — приложение с открытым исходным кодом, которое может работать как универсальный производитель или потребитель данных и не требует установки Java Runtime Environment.
Перед подключением установите зависимости:
sudo apt update && sudo apt install -y kafkacat
-
Запустите команду получения сообщений из топика:
kafkacat -C \ -b <FQDN_брокера>:9092 \ -t <имя_топика> \ -X security.protocol=SASL_PLAINTEXT \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<логин_потребителя>" \ -X sasl.password="<пароль_потребителя>" -Z
Команда будет непрерывно считывать новые сообщения из топика.
-
В отдельном терминале запустите команду отправки сообщения в топик:
echo "test message" | kafkacat -P \ -b <FQDN_брокера>:9092 \ -t <имя_топика> \ -k key \ -X security.protocol=SASL_PLAINTEXT \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<логин_производителя>" \ -X sasl.password="<пароль_производителя>" -Z
-
Запустите команду получения сообщений из топика:
kafkacat -C \ -b <FQDN_брокера>:9091 \ -t <имя_топика> \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<логин_потребителя>" \ -X sasl.password="<пароль_потребителя>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
Команда будет непрерывно считывать новые сообщения из топика.
-
В отдельном терминале запустите команду отправки сообщения в топик:
echo "test message" | kafkacat -P \ -b <FQDN_брокера>:9091 \ -t <имя_топика> \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<логин_производителя>" \ -X sasl.password="<пароль_производителя>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
Как получить FQDN хоста-брокера, см. в инструкции.
Убедитесь, что в первом терминале отобразилось сообщение key:test message
, отправленное во втором.
Инструменты Apache Kafka® для Linux (Bash)/macOS (Zsh)
В состав архивов с бинарными файлами Apache Kafka®
- С помощью kafka-console-producer
будет отправлено сообщение в топик. - С помощью kafka-console-consumer
будет получено сообщение из топика.
Перед подключением:
-
Установите OpenJDK:
sudo apt update && sudo apt install --yes default-jdk
-
Загрузите архив с бинарными файлами
для версии Apache Kafka®, которая используется в кластере. Версия Scala неважна. -
Распакуйте архив.
-
Создайте файлы с параметрами для подключения к кластеру: файл для производителя и файл для потребителя.
Эти файлы имеют одинаковое содержимое, различаются только реквизиты пользователя:
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="<логин_производителя_или_потребителя>" \ password="<пароль_производителя_или_потребителя>"; security.protocol=SASL_PLAINTEXT
-
Запустите команду получения сообщений из топика:
<путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-consumer.sh \ --consumer.config <путь_к_файлу_с_параметрами_для_потребителя> \ --bootstrap-server <FQDN_брокера>:9092 \ --topic <имя_топика> \ --property print.key=true \ --property key.separator=":"
Команда будет непрерывно считывать новые сообщения из топика.
-
В отдельном терминале запустите команду отправки сообщения в топик:
echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-producer.sh \ --producer.config <путь_к_файлу_с_параметрами_для_производителя> \ --bootstrap-server <FQDN_брокера>:9092 \ --topic <имя_топика> \ --property parse.key=true \ --property key.separator=":"
-
Перейдите в каталог, где будет располагаться хранилище сертификатов Java:
cd /etc/security
-
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль не короче 6 символов в параметре
-storepass
для дополнительной защиты хранилища:sudo keytool -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore ssl -storepass <пароль_хранилища_сертификатов> \ --noprompt
-
Создайте файлы с параметрами для подключения к кластеру: файл для производителя и файл для потребителя.
Эти файлы имеют одинаковое содержимое, различаются только реквизиты пользователя:
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="<логин_производителя_или_потребителя>" \ password="<пароль_производителя_или_потребителя>"; security.protocol=SASL_SSL ssl.truststore.location=/etc/security/ssl ssl.truststore.password=<пароль_хранилища_сертификатов>
-
Запустите команду получения сообщений из топика:
<путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-consumer.sh \ --consumer.config <путь_к_файлу_с_параметрами_для_потребителя> \ --bootstrap-server <FQDN_брокера>:9091 \ --topic <имя_топика> \ --property print.key=true \ --property key.separator=":"
Команда будет непрерывно считывать новые сообщения из топика.
-
В отдельном терминале запустите команду отправки сообщения в топик:
echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-producer.sh \ --producer.config <путь_к_файлу_с_параметрами_для_производителя> \ --bootstrap-server <FQDN_брокера>:9091 \ --topic <имя_топика> \ --property parse.key=true \ --property key.separator=":"
Как получить FQDN хоста-брокера, см. в инструкции.
Убедитесь, что в первом терминале отобразилось сообщение key:test message
, отправленное во втором.
Инструменты Apache Kafka® для Windows (PowerShell)
В состав архивов с бинарными файлами Apache Kafka®
- С помощью kafka-console-producer
будет отправлено сообщение в топик. - С помощью kafka-console-consumer
будет получено сообщение из топика.
Хотя документация по инструментам содержит упоминание скриптов .sh
, она актуальна и при работе в Windows. Сами инструменты одинаковы для любой платформы, различаются лишь скрипты, которые запускают их, например:
bin/kafka-console-producer.sh
для Linux (Bash)/macOS (Zsh).bin\windows\kafka-console-producer.bat
для Windows (PowerShell).
Перед подключением:
-
Установите последнюю доступную версию Microsoft OpenJDK
. -
Загрузите архив с бинарными файлами
для версии Apache Kafka®, которая используется в кластере. Версия Scala неважна. -
Распакуйте архив.
Совет
Распаковывайте файлы Apache Kafka® в корневой каталог диска, например,
C:\kafka_2.12-2.6.0\
.Если путь к исполняемым и пакетным файлам Apache Kafka® будет слишком длинным, то при попытке запустить их возникнет ошибка
The input line is too long
.
-
Создайте файлы с параметрами для подключения к кластеру: файл для производителя и файл для потребителя.
Эти файлы имеют одинаковое содержимое, различаются только реквизиты пользователя:
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="<логин_производителя_или_потребителя>" \ password="<пароль_производителя_или_потребителя>"; security.protocol=SASL_PLAINTEXT
-
Запустите команду получения сообщений из топика:
<путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-consumer.bat ` --consumer.config <путь_к_файлу_с_параметрами_для_потребителя> ` --bootstrap-server <FQDN_брокера>:9092 ` --topic <имя_топика> ` --property print.key=true ` --property key.separator=":"
Команда будет непрерывно считывать новые сообщения из топика.
-
В отдельном терминале запустите команду отправки сообщения в топик:
echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-producer.bat ` --producer.config <путь_к_файлу_с_параметрами_для_производителя> ` --bootstrap-server <FQDN_брокера>:9092 ` --topic <имя_топика> ` --property parse.key=true ` --property key.separator=":"
-
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль в параметре
--storepass
для дополнительной защиты хранилища:keytool.exe -importcert -alias YandexCA ` --file $HOME\.kafka\YandexInternalRootCA.crt ` --keystore $HOME\.kafka\ssl ` --storepass <пароль_хранилища_сертификатов> ` --noprompt
-
Создайте файлы с параметрами для подключения к кластеру: файл для производителя и файл для потребителя.
Эти файлы имеют одинаковое содержимое, различаются только реквизиты пользователя:
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="<логин_производителя_или_потребителя>" \ password="<пароль_производителя_или_потребителя>"; security.protocol=SASL_SSL ssl.truststore.location=<значение_переменной_$HOME>\\.kafka\\ssl ssl.truststore.password=<пароль_хранилища_сертификатов>
В качестве значения параметра
ssl.truststore.location
укажите полный путь к хранилищу сертификатов, например:ssl.truststore.location=C:\\Users\\Administrator\\.kafka\\ssl
Хранилище сертификатов расположено по пути
$HOME\.kafka\ssl
, но в значении нельзя использовать переменные среды окружения. Чтобы раскрыть переменную, выполните команду:echo $HOME
Важно
Используйте
\\
вместо\
при указании значения параметраssl.truststore.location
, иначе при запуске команд не удастся получить доступ к хранилищу сертификатов. -
Запустите команду получения сообщений из топика:
<путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-consumer.bat ` --consumer.config <путь_к_файлу_с_параметрами_для_потребителя> ` --bootstrap-server <FQDN_брокера>:9091 ` --topic <имя_топика> ` --property print.key=true ` --property key.separator=":"
Команда будет непрерывно считывать новые сообщения из топика.
-
В отдельном терминале запустите команду отправки сообщения в топик:
echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-producer.bat ` --producer.config <путь_к_файлу_с_параметрами_для_производителя> ` --bootstrap-server <FQDN_брокера>:9091 ` --topic <имя_топика> ` --property parse.key=true ` --property key.separator=":"
Как получить FQDN хоста-брокера, см. в инструкции.
Убедитесь, что в первом терминале отобразилось сообщение key:test message
, отправленное во втором.
Подготовка к подключению из Docker-контейнера
Чтобы подключаться к кластеру Managed Service for Apache Kafka® из Docker-контейнера, добавьте в Dockerfile строки:
RUN apt-get update && \
apt-get install kafkacat --yes
RUN apt-get update && \
apt-get install wget kafkacat --yes && \
mkdir --parents /usr/local/share/ca-certificates/Yandex/ && \
wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \
--output-document /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt && \
chmod 0655 /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt