Отслеживание утери сообщений в топике Apache Kafka®
- Необходимые платные ресурсы
- Перед началом работы
- Подготовьте инструменты для подключения к топику
- Подготовьте команды для отправки и получения сообщений
- Запустите команду получения сообщений
- Создайте графики мониторинга
- Проверьте отправку и получение сообщения
- Включите удаление сообщений
- Повторно проверьте отправку и получение сообщения
- Проанализируйте графики мониторинга
- Удалите созданные ресурсы
Утеря сообщений в топике Apache Kafka® группой потребителей возникает из-за сочетания двух факторов:
- В этом топике или во всем кластере включена политика очистки лога
Delete
и задано малое время жизни сегмента логаLog retention
. - Одна или несколько групп потребителей недостаточно быстро вычитывают сообщения из топика. В результате могут быть удалены даже те сообщения, которые еще не были прочитаны.
Утерю сообщений можно отслеживать с помощью метрик сервиса Managed Service for Apache Kafka®, поставляемых в Monitoring. Если значение kafka_group_topic_partition_offset
становится меньше kafka_log_Log_LogStartOffset
, это указывает на утерю сообщений группой потребителей.
В этом руководстве вы:
- Смоделируете утерю сообщений в топике на тестовом кластере Managed Service for Apache Kafka®, используя инструменты подключения к топику
. - Построите график метрик
kafka_group_topic_partition_offset
,kafka_log_Log_LogStartOffset
иkafka_log_Log_LogEndOffset
с помощью сервиса Yandex Monitoring, а также проследите закономерности, возникающие при утере сообщений.
Чтобы смоделировать и отследить утерю сообщений в топике Apache Kafka®:
- Подготовьте инструменты для подключения к топику.
- Подготовьте команды для отправки и получения сообщений.
- Запустите команду получения сообщений.
- Создайте графики мониторинга.
- Проверьте отправку и получение сообщения.
- Включите удаление сообщений.
- Повторно проверьте отправку и получение сообщения.
- Проанализируйте графики мониторинга.
Если созданные ресурсы вам больше не нужны, удалите их.
Необходимые платные ресурсы
В стоимость поддержки описываемого решения входят:
- Плата за кластер Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
- Плата за использование публичных IP-адресов для хостов кластеров (см. тарифы Virtual Private Cloud).
Перед началом работы
-
Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации. При создании кластера включите опцию Публичный доступ.
-
Создайте топик для обмена сообщениями между производителем и потребителем со следующими параметрами:
- Имя —
messages
; - Количество разделов —
1
.
- Имя —
-
Создайте пользователя с именем
user
и выдайте ему права на топикmessages
:ACCESS_ROLE_CONSUMER
,ACCESS_ROLE_PRODUCER
.
-
Примечание
Подключение к хостам в публичном доступе возможно только с использованием SSL-сертификата.
-
Выберите один из хостов кластера с ролью
KAFKA
и получите его FQDN. -
Выберите имя для группы потребителей, например
test-consumer-group
.
Подготовьте инструменты для подключения к топику
-
Установите OpenJDK:
sudo apt update && sudo apt install --yes default-jdk
-
Загрузите архив с бинарными файлами
для версии Apache Kafka®, которая используется в кластере. Версия Scala не важна. -
Распакуйте архив.
-
Перейдите в каталог, где будет располагаться хранилище сертификатов Java:
cd /etc/security
-
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль не короче 6 символов в параметре
-storepass
для дополнительной защиты хранилища:sudo keytool -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore ssl -storepass <пароль_хранилища_сертификатов> \ --noprompt
-
Создайте файл с параметрами для подключения к кластеру.
Примечание
В этом руководстве для простоты используется единственный пользователь, являющийся и производителем, и потребителем. Поэтому достаточно создать только один файл параметров, который будет использоваться и при отправке, и при получении сообщений.
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="user" \ password="<пароль_пользователя_user>"; security.protocol=SASL_SSL ssl.truststore.location=/etc/security/ssl ssl.truststore.password=<пароль_хранилища_сертификатов>
-
Установите последнюю доступную версию Microsoft OpenJDK
. -
Загрузите архив с бинарными файлами
для версии Apache Kafka®, которая используется в кластере. Версия Scala не важна. -
Распакуйте архив.
Совет
Распаковывайте файлы Apache Kafka® в корневой каталог диска, например,
C:\kafka_2.12-2.6.0\
.Если путь к исполняемым и пакетным файлам Apache Kafka® будет слишком длинным, то при попытке запустить их возникнет ошибка
The input line is too long
. -
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль в параметре
--storepass
для дополнительной защиты хранилища:keytool.exe -importcert -alias YandexCA ` --file $HOME\.kafka\YandexInternalRootCA.crt ` --keystore $HOME\.kafka\ssl ` --storepass <пароль_хранилища_сертификатов> ` --noprompt
-
Создайте файл с параметрами для подключения к кластеру.
Примечание
В этом руководстве для простоты используется единственный пользователь, являющийся и производителем, и потребителем. Поэтому достаточно создать только один файл параметров, который будет использоваться и при отправке, и при получении сообщений.
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="user" \ password="<пароль_пользователя_user>"; security.protocol=SASL_SSL ssl.truststore.location=<значение_переменной_$HOME>\\.kafka\\ssl ssl.truststore.password=<пароль_хранилища_сертификатов>
В качестве значения параметра
ssl.truststore.location
укажите полный путь к хранилищу сертификатов, например:ssl.truststore.location=C:\\Users\\Administrator\\.kafka\\ssl
Хранилище сертификатов расположено по пути
$HOME\.kafka\ssl
, но в значении нельзя использовать переменные среды окружения. Чтобы раскрыть переменную, выполните команду:echo $HOME
Важно
Используйте
\\
вместо\
при указании значения параметраssl.truststore.location
, иначе при запуске команд не удастся получить доступ к хранилищу сертификатов.
Подготовьте команды для отправки и получения сообщений
-
Команда отправки сообщения в топик
messages
:echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-producer.sh \ --producer.config <путь_к_файлу_с_параметрами> \ --bootstrap-server <FQDN_хоста>:9091 \ --topic messages \ --property parse.key=true \ --property key.separator=":"
-
Команда получения сообщений из топика
messages
:<путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-consumer.sh \ --consumer.config <путь_к_файлу_с_параметрами> \ --bootstrap-server <FQDN_хоста>:9091 \ --group test-consumer-group \ --topic messages \ --property print.key=true \ --property key.separator=":"
-
Команда отправки сообщения в топик
messages
:echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-producer.bat ` --producer.config <путь_к_файлу_с_параметрами> ` --bootstrap-server <FQDN_хоста>:9091 ` --topic messages ` --property parse.key=true ` --property key.separator=":"
-
Команда получения сообщений из топика
messages
:<путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-consumer.bat ` --consumer.config <путь_к_файлу_с_параметрами> ` --bootstrap-server <FQDN_хоста>:9091 ` --group test-consumer-group ` --topic messages ` --property print.key=true ` --property key.separator=":"
Запустите команду получения сообщений
-
Запустите команду получения сообщений.
-
Через 10–15 секунд прервите команду, нажав Ctrl + C. Проверьте, что в терминале появилось сообщение:
Processed a total of 0 messages
Это сообщение означает, что потребитель успешно подключился к топику.
-
Подтвердите завершение команды.
Теперь группа потребителей test-consumer-group
зарегистрирована и может быть использована в качестве метки для метрик.
Создайте графики мониторинга
Используя сервис Yandex Monitoringkafka_group_topic_partition_offset
, kafka_log_Log_LogStartOffset
и kafka_log_Log_LogEndOffset
:
- Для
kafka_group_topic_partition_offset
укажите метки:service = managed-kafka
,name = kafka_group_topic_partition_offset
,host = <FQDN_хоста>
,topic = messages
,group = test-consumer-group
.
- Для
kafka_log_Log_LogStartOffset
укажите метки:service = managed-kafka
,name = kafka_log_Log_LogStartOffset
,host = <FQDN_хоста>
,topic = messages
.
- Для
kafka_log_Log_LogEndOffset
укажите метки:service = managed-kafka
,name = kafka_log_Log_LogEndOffset
,host = <FQDN_хоста>
,topic = messages
.
Важно
Тестовый топик messages
содержит только один раздел, поэтому метку partition
можно не указывать. Если в вашем решении топик содержит несколько разделов, при построении графиков перечисленных метрик укажите метку partition
.
Примечание
Для отслеживания утери сообщений достаточно только метрик kafka_group_topic_partition_offset
и kafka_log_Log_LogStartOffset
, но дополнительная метрика kafka_log_Log_LogEndOffset
сделает график более наглядным.
Проверьте отправку и получение сообщения
-
Запустите команду отправки сообщения.
-
Спустя примерно 3 минуты запустите команду получения сообщений. Проверьте, что в терминале отобразилось сообщение
key:test message
. -
Нажмите Ctrl + C, чтобы прервать команду получения сообщений из топика.
Поскольку удаление сообщений выключено, спустя три минуты сообщение все еще доступно для потребителей.
Включите удаление сообщений
Задайте следующие настройки топика messages
:
- Политика очистки лога —
Delete
; - Время жизни сегмента лога, мс —
60000
.
Примечание
После изменения настроек топика кластер некоторое время будет обновляться.
Теперь сообщения будут автоматически удаляться через 60 секунд после их записи в топик.
Повторно проверьте отправку и получение сообщения
-
Запустите команду отправки сообщения.
-
Спустя примерно 3 минуты запустите команду получения сообщений. Проверьте, что на этот раз сообщение не было получено.
-
Нажмите Ctrl + C, чтобы прервать команду получения сообщений из топика.
Спустя 60 секунд после записи в топик все сообщения удаляются, поэтому для группы слишком «медленных» потребителей возникает ситуация утери сообщений.
Проанализируйте графики мониторинга
Перейдите в сервис Yandex Monitoring
kafka_log_Log_LogStartOffset
— первое смещение в разделе. Увеличивается при записи сообщений в топик.kafka_log_Log_LogEndOffset
— последнее смещение в разделе. Увеличивается при удалении сообщений из топика.kafka_group_topic_partition_offset
— текущее смещение группы потребителей в разделе. Увеличивается при вычитывании сообщений из топика группой потребителей.
На графике видны следующие закономерности:
- В первый момент времени все три метрики имеют значение
0
. - После отправки первого сообщения значение
kafka_log_Log_LogEndOffset
вырастает до1
. - Через 3 минуты, в момент получения первого сообщения,
kafka_group_topic_partition_offset
также вырастает до1
. Поскольку сообщения не удаляются,kafka_log_Log_LogStartOffset
остается равным0
. - Через минуту после включения удаления сообщений сообщение удаляется из топика, и
kafka_log_Log_LogStartOffset
принимает значение1
. Теперь все три метрики имеют значение1
. - После отправки второго сообщения значение
kafka_log_Log_LogEndOffset
вырастает до2
. Через минуту сообщение удаляется из топика, иkafka_log_Log_LogStartOffset
также принимает значение2
. При этомkafka_group_topic_partition_offset
все еще имеет значение1
. - При втором запуске команды получения сообщений значение
kafka_group_topic_partition_offset
также вырастает до2
, хотя сообщение не получено.
Выводы:
- В норме значение
kafka_group_topic_partition_offset
находится между значениямиkafka_log_Log_LogStartOffset
иkafka_log_Log_LogEndOffset
. Еслиkafka_group_topic_partition_offset
становится меньшеkafka_log_Log_LogStartOffset
, это указывает на утерю сообщений этой группой потребителей. - Разница между значениями
kafka_log_Log_LogEndOffset
иkafka_group_topic_partition_offset
показывает, сколько новых сообщений пока не вычитано (т. е. насколько группа потребителей отстает от производителей).
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:
- Удалите кластер Managed Service for Apache Kafka®.
- Освободите и удалите публичные статические IP-адреса.