Отслеживание потери сообщений в топике Apache Kafka®
- Необходимые платные ресурсы
- Перед началом работы
- Подготовьте инструменты для подключения к топику
- Подготовьте команды для отправки и получения сообщений
- Запустите команду получения сообщений
- Создайте графики мониторинга
- Проверьте отправку и получение сообщения
- Включите удаление сообщений
- Повторно проверьте отправку и получение сообщения
- Проанализируйте графики мониторинга
- Удалите созданные ресурсы
Потеря сообщений в топике Apache Kafka® группа потребителей возникает из-за сочетания двух факторов:
- В этом топике или во всем кластере включена политика очистки лога
Deleteи задано малое время жизни сегмента логаLog retention. - Одна или несколько групп потребителей недостаточно быстро вычитывают сообщения из топика. В результате могут быть удалены даже те сообщения, которые еще не были прочитаны.
Потерю сообщений можно отслеживать с помощью метрик сервиса Managed Service for Apache Kafka®, поставляемых в Monitoring. Если значение kafka_group_topic_partition_offset становится меньше kafka_log_Log_LogStartOffset, это указывает на потерю сообщений группой потребителей.
В этом руководстве вы:
- Смоделируете потерю сообщений в топике на тестовом кластере Managed Service for Apache Kafka®, используя инструменты подключения к топику
. - Построите график метрик
kafka_group_topic_partition_offset,kafka_log_Log_LogStartOffsetиkafka_log_Log_LogEndOffsetс помощью сервиса Yandex Monitoring, а также проследите закономерности, возникающие при потере сообщений.
Чтобы смоделировать и отследить потерю сообщений в топике Apache Kafka®:
- Подготовьте инструменты для подключения к топику.
- Подготовьте команды для отправки и получения сообщений.
- Запустите команду получения сообщений.
- Создайте графики мониторинга.
- Проверьте отправку и получение сообщения.
- Включите удаление сообщений.
- Повторно проверьте отправку и получение сообщения.
- Проанализируйте графики мониторинга.
Если созданные ресурсы вам больше не нужны, удалите их.
Необходимые платные ресурсы
В стоимость поддержки описываемого решения входят:
- Плата за кластер Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
- Плата за использование публичных IP-адресов для хостов кластеров (см. тарифы Virtual Private Cloud).
Перед началом работы
-
Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации. При создании кластера включите опцию Публичный доступ.
-
Создайте топик для обмена сообщениями между производителем и потребителем со следующими параметрами:
- Имя —
messages; - Количество разделов —
1.
- Имя —
-
Создайте пользователя с именем
userи выдайте ему права на топикmessages:ACCESS_ROLE_CONSUMER,ACCESS_ROLE_PRODUCER.
-
Примечание
Подключение к хостам в публичном доступе возможно только с использованием SSL-сертификата.
-
Выберите один из хостов кластера с ролью
KAFKAи получите его FQDN. -
Выберите имя для группы потребителей, например
test-consumer-group.
Подготовьте инструменты для подключения к топику
-
Установите OpenJDK:
sudo apt update && sudo apt install --yes default-jdk -
Загрузите архив с бинарными файлами
для версии Apache Kafka®, которая используется в кластере. Версия Scala не важна. -
Распакуйте архив.
-
Перейдите в каталог, где будет располагаться хранилище сертификатов Java:
cd /etc/security -
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль не короче 6 символов в параметре
-storepassдля дополнительной защиты хранилища:sudo keytool -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore ssl -storepass <пароль_хранилища_сертификатов> \ --noprompt -
Создайте файл с параметрами для подключения к кластеру.
Примечание
В этом руководстве для простоты используется единственный пользователь, являющийся и производителем, и потребителем. Поэтому достаточно создать только один файл параметров, который будет использоваться и при отправке, и при получении сообщений.
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="user" \ password="<пароль_пользователя_user>"; security.protocol=SASL_SSL ssl.truststore.location=/etc/security/ssl ssl.truststore.password=<пароль_хранилища_сертификатов>
-
Установите последнюю доступную версию Microsoft OpenJDK
. -
Загрузите архив с бинарными файлами
для версии Apache Kafka®, которая используется в кластере. Версия Scala не важна. -
Распакуйте архив.
Совет
Распаковывайте файлы Apache Kafka® в корневой каталог диска, например,
C:\kafka_2.12-2.6.0\.Если путь к исполняемым и пакетным файлам Apache Kafka® будет слишком длинным, то при попытке запустить их возникнет ошибка
The input line is too long. -
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль в параметре
--storepassдля дополнительной защиты хранилища:keytool.exe -importcert -alias YandexCA ` --file $HOME\.kafka\YandexInternalRootCA.crt ` --keystore $HOME\.kafka\ssl ` --storepass <пароль_хранилища_сертификатов> ` --noprompt -
Создайте файл с параметрами для подключения к кластеру.
Примечание
В этом руководстве для простоты используется единственный пользователь, являющийся и производителем, и потребителем. Поэтому достаточно создать только один файл параметров, который будет использоваться и при отправке, и при получении сообщений.
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="user" \ password="<пароль_пользователя_user>"; security.protocol=SASL_SSL ssl.truststore.location=<значение_переменной_$HOME>\\.kafka\\ssl ssl.truststore.password=<пароль_хранилища_сертификатов>В качестве значения параметра
ssl.truststore.locationукажите полный путь к хранилищу сертификатов, например:ssl.truststore.location=C:\\Users\\Administrator\\.kafka\\sslХранилище сертификатов расположено по пути
$HOME\.kafka\ssl, но в значении нельзя использовать переменные среды окружения. Чтобы раскрыть переменную, выполните команду:echo $HOMEВажно
Используйте
\\вместо\при указании значения параметраssl.truststore.location, иначе при запуске команд не удастся получить доступ к хранилищу сертификатов.
Подготовьте команды для отправки и получения сообщений
-
Команда отправки сообщения в топик
messages:echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-producer.sh \ --producer.config <путь_к_файлу_с_параметрами> \ --bootstrap-server <FQDN_хоста>:9091 \ --topic messages \ --property parse.key=true \ --property key.separator=":" -
Команда получения сообщений из топика
messages:<путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-consumer.sh \ --consumer.config <путь_к_файлу_с_параметрами> \ --bootstrap-server <FQDN_хоста>:9091 \ --group test-consumer-group \ --topic messages \ --property print.key=true \ --property key.separator=":"
-
Команда отправки сообщения в топик
messages:echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-producer.bat ` --producer.config <путь_к_файлу_с_параметрами> ` --bootstrap-server <FQDN_хоста>:9091 ` --topic messages ` --property parse.key=true ` --property key.separator=":" -
Команда получения сообщений из топика
messages:<путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-consumer.bat ` --consumer.config <путь_к_файлу_с_параметрами> ` --bootstrap-server <FQDN_хоста>:9091 ` --group test-consumer-group ` --topic messages ` --property print.key=true ` --property key.separator=":"
Запустите команду получения сообщений
-
Запустите команду получения сообщений.
-
Через 10–15 секунд прервите команду, нажав Ctrl + C. Проверьте, что в терминале появилось сообщение:
Processed a total of 0 messagesЭто сообщение означает, что потребитель успешно подключился к топику.
-
Подтвердите завершение команды.
Теперь группа потребителей test-consumer-group зарегистрирована и может быть использована в качестве метки для метрик.
Создайте графики мониторинга
Используя сервис Yandex Monitoringkafka_group_topic_partition_offset, kafka_log_Log_LogStartOffset и kafka_log_Log_LogEndOffset:
- Для
kafka_group_topic_partition_offsetукажите метки:service = managed-kafka,name = kafka_group_topic_partition_offset,host = <FQDN_хоста>,topic = messages,group = test-consumer-group.
- Для
kafka_log_Log_LogStartOffsetукажите метки:service = managed-kafka,name = kafka_log_Log_LogStartOffset,host = <FQDN_хоста>,topic = messages.
- Для
kafka_log_Log_LogEndOffsetукажите метки:service = managed-kafka,name = kafka_log_Log_LogEndOffset,host = <FQDN_хоста>,topic = messages.
Важно
Тестовый топик messages содержит только один раздел, поэтому метку partition можно не указывать. Если в вашем решении топик содержит несколько разделов, при построении графиков перечисленных метрик укажите метку partition.
Примечание
Для отслеживания потери сообщений достаточно только метрик kafka_group_topic_partition_offset и kafka_log_Log_LogStartOffset, но дополнительная метрика kafka_log_Log_LogEndOffset сделает график более наглядным.
Проверьте отправку и получение сообщения
-
Запустите команду отправки сообщения.
-
Спустя примерно 3 минуты запустите команду получения сообщений. Проверьте, что в терминале отобразилось сообщение
key:test message. -
Нажмите Ctrl + C, чтобы прервать команду получения сообщений из топика.
Поскольку удаление сообщений выключено, спустя три минуты сообщение все еще доступно для потребителей.
Включите удаление сообщений
Задайте следующие настройки топика messages:
- Политика очистки лога —
Delete; - Время жизни сегмента лога, мс —
60000.
Примечание
После изменения настроек топика кластер некоторое время будет обновляться.
Теперь сообщения будут автоматически удаляться через 60 секунд после их записи в топик.
Повторно проверьте отправку и получение сообщения
-
Запустите команду отправки сообщения.
-
Спустя примерно 3 минуты запустите команду получения сообщений. Проверьте, что на этот раз сообщение не было получено.
-
Нажмите Ctrl + C, чтобы прервать команду получения сообщений из топика.
Спустя 60 секунд после записи в топик все сообщения удаляются, поэтому для группы слишком «медленных» потребителей возникает ситуация потери сообщений.
Проанализируйте графики мониторинга
Перейдите в сервис Yandex Monitoring
kafka_log_Log_LogStartOffset— первое смещение в разделе. Увеличивается при записи сообщений в топик.kafka_log_Log_LogEndOffset— последнее смещение в разделе. Увеличивается при удалении сообщений из топика.kafka_group_topic_partition_offset— текущее смещение группы потребителей в разделе. Увеличивается при вычитывании сообщений из топика группой потребителей.
На графике видны следующие закономерности:
- В первый момент времени все три метрики имеют значение
0. - После отправки первого сообщения значение
kafka_log_Log_LogEndOffsetвырастает до1. - Через 3 минуты, в момент получения первого сообщения,
kafka_group_topic_partition_offsetтакже вырастает до1. Поскольку сообщения не удаляются,kafka_log_Log_LogStartOffsetостается равным0. - Через минуту после включения удаления сообщений сообщение удаляется из топика, и
kafka_log_Log_LogStartOffsetпринимает значение1. Теперь все три метрики имеют значение1. - После отправки второго сообщения значение
kafka_log_Log_LogEndOffsetвырастает до2. Через минуту сообщение удаляется из топика, иkafka_log_Log_LogStartOffsetтакже принимает значение2. При этомkafka_group_topic_partition_offsetвсе еще имеет значение1. - При втором запуске команды получения сообщений значение
kafka_group_topic_partition_offsetтакже вырастает до2, хотя сообщение не получено.
Выводы:
- В норме значение
kafka_group_topic_partition_offsetнаходится между значениямиkafka_log_Log_LogStartOffsetиkafka_log_Log_LogEndOffset. Еслиkafka_group_topic_partition_offsetстановится меньшеkafka_log_Log_LogStartOffset, это указывает на потерю сообщений этой группой потребителей. - Разница между значениями
kafka_log_Log_LogEndOffsetиkafka_group_topic_partition_offsetпоказывает, сколько новых сообщений пока не вычитано (т. е. насколько группа потребителей отстает от производителей).
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:
- Удалите кластер Managed Service for Apache Kafka®.
- Освободите и удалите публичные статические IP-адреса.