Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Истории успеха
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Доступны в регионе
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • AI Studio
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Партнёрская программа
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Истории успеха
  • Документация
  • Блог
Проект Яндекса
© 2025 ТОО «Облачные Сервисы Казахстан»
Страница сервиса
Yandex Managed Service for Apache Kafka®
Документация
Yandex Managed Service for Apache Kafka®
  • Начало работы
    • Все руководства
    • Развертывание веб-интерфейса Apache Kafka®
    • Обновление кластера Managed Service for Apache Kafka® с ZooKeeper на KRaft
    • Работа с топиками Apache Kafka® с помощью Yandex Data Processing
    • Отслеживание утери сообщений в топике Apache Kafka®
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Yandex Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • История изменений
  • Вопросы и ответы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте инструменты для подключения к топику
  • Подготовьте команды для отправки и получения сообщений
  • Запустите команду получения сообщений
  • Создайте графики мониторинга
  • Проверьте отправку и получение сообщения
  • Включите удаление сообщений
  • Повторно проверьте отправку и получение сообщения
  • Проанализируйте графики мониторинга
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Отслеживание утери сообщений в топике Apache Kafka®

Отслеживание утери сообщений в топике Apache Kafka®

Статья создана
Yandex Cloud
Обновлена 22 августа 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте инструменты для подключения к топику
  • Подготовьте команды для отправки и получения сообщений
  • Запустите команду получения сообщений
  • Создайте графики мониторинга
  • Проверьте отправку и получение сообщения
  • Включите удаление сообщений
  • Повторно проверьте отправку и получение сообщения
  • Проанализируйте графики мониторинга
  • Удалите созданные ресурсы

Утеря сообщений в топике Apache Kafka® группой потребителей возникает из-за сочетания двух факторов:

  1. В этом топике или во всем кластере включена политика очистки лога Delete и задано малое время жизни сегмента лога Log retention.
  2. Одна или несколько групп потребителей недостаточно быстро вычитывают сообщения из топика. В результате могут быть удалены даже те сообщения, которые еще не были прочитаны.

Утерю сообщений можно отслеживать с помощью метрик сервиса Managed Service for Apache Kafka®, поставляемых в Monitoring. Если значение kafka_group_topic_partition_offset становится меньше kafka_log_Log_LogStartOffset, это указывает на утерю сообщений группой потребителей.

В этом руководстве вы:

  • Смоделируете утерю сообщений в топике на тестовом кластере Managed Service for Apache Kafka®, используя инструменты подключения к топику.
  • Построите график метрик kafka_group_topic_partition_offset, kafka_log_Log_LogStartOffset и kafka_log_Log_LogEndOffset с помощью сервиса Yandex Monitoring, а также проследите закономерности, возникающие при утере сообщений.

Чтобы смоделировать и отследить утерю сообщений в топике Apache Kafka®:

  1. Подготовьте инструменты для подключения к топику.
  2. Подготовьте команды для отправки и получения сообщений.
  3. Запустите команду получения сообщений.
  4. Создайте графики мониторинга.
  5. Проверьте отправку и получение сообщения.
  6. Включите удаление сообщений.
  7. Повторно проверьте отправку и получение сообщения.
  8. Проанализируйте графики мониторинга.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за использование публичных IP-адресов для хостов кластеров (см. тарифы Virtual Private Cloud).

Перед началом работыПеред началом работы

  1. Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации. При создании кластера включите опцию Публичный доступ.

  2. Создайте топик для обмена сообщениями между производителем и потребителем со следующими параметрами:

    • Имя — messages;
    • Количество разделов — 1.
  3. Создайте пользователя с именем user и выдайте ему права на топик messages:

    • ACCESS_ROLE_CONSUMER,
    • ACCESS_ROLE_PRODUCER.
  4. Настройте группы безопасности.

  5. Получите SSL-сертификат.

    Примечание

    Подключение к хостам в публичном доступе возможно только с использованием SSL-сертификата.

  6. Выберите один из хостов кластера с ролью KAFKA и получите его FQDN.

  7. Выберите имя для группы потребителей, например test-consumer-group.

Подготовьте инструменты для подключения к топикуПодготовьте инструменты для подключения к топику

CLI для Bash
CLI для PowerShell
  1. Установите OpenJDK:

    sudo apt update && sudo apt install --yes default-jdk
    
  2. Загрузите архив с бинарными файлами для версии Apache Kafka®, которая используется в кластере. Версия Scala не важна.

  3. Распакуйте архив.

  4. Перейдите в каталог, где будет располагаться хранилище сертификатов Java:

    cd /etc/security
    
  5. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль не короче 6 символов в параметре -storepass для дополнительной защиты хранилища:

    sudo keytool -importcert \
                 -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
                 -keystore ssl -storepass <пароль_хранилища_сертификатов> \
                 --noprompt
    
  6. Создайте файл с параметрами для подключения к кластеру.

    Примечание

    В этом руководстве для простоты используется единственный пользователь, являющийся и производителем, и потребителем. Поэтому достаточно создать только один файл параметров, который будет использоваться и при отправке, и при получении сообщений.

    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="user" \
      password="<пароль_пользователя_user>";
    security.protocol=SASL_SSL
    ssl.truststore.location=/etc/security/ssl
    ssl.truststore.password=<пароль_хранилища_сертификатов>
    
  1. Установите последнюю доступную версию Microsoft OpenJDK.

  2. Загрузите архив с бинарными файлами для версии Apache Kafka®, которая используется в кластере. Версия Scala не важна.

  3. Распакуйте архив.

    Совет

    Распаковывайте файлы Apache Kafka® в корневой каталог диска, например, C:\kafka_2.12-2.6.0\.

    Если путь к исполняемым и пакетным файлам Apache Kafka® будет слишком длинным, то при попытке запустить их возникнет ошибка The input line is too long.

  4. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль в параметре --storepass для дополнительной защиты хранилища:

    keytool.exe -importcert -alias YandexCA `
    --file $HOME\.kafka\YandexInternalRootCA.crt `
    --keystore $HOME\.kafka\ssl `
    --storepass <пароль_хранилища_сертификатов> `
    --noprompt
    
  5. Создайте файл с параметрами для подключения к кластеру.

    Примечание

    В этом руководстве для простоты используется единственный пользователь, являющийся и производителем, и потребителем. Поэтому достаточно создать только один файл параметров, который будет использоваться и при отправке, и при получении сообщений.

    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="user" \
      password="<пароль_пользователя_user>";
    security.protocol=SASL_SSL
    ssl.truststore.location=<значение_переменной_$HOME>\\.kafka\\ssl
    ssl.truststore.password=<пароль_хранилища_сертификатов>
    

    В качестве значения параметра ssl.truststore.location укажите полный путь к хранилищу сертификатов, например:

    ssl.truststore.location=C:\\Users\\Administrator\\.kafka\\ssl
    

    Хранилище сертификатов расположено по пути $HOME\.kafka\ssl, но в значении нельзя использовать переменные среды окружения. Чтобы раскрыть переменную, выполните команду:

    echo $HOME
    

    Важно

    Используйте \\ вместо \ при указании значения параметра ssl.truststore.location, иначе при запуске команд не удастся получить доступ к хранилищу сертификатов.

Подготовьте команды для отправки и получения сообщенийПодготовьте команды для отправки и получения сообщений

CLI для Bash
CLI для PowerShell
  • Команда отправки сообщения в топик messages:

    echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-producer.sh \
      --producer.config <путь_к_файлу_с_параметрами> \
      --bootstrap-server <FQDN_хоста>:9091 \
      --topic messages \
      --property parse.key=true \
      --property key.separator=":"
    
  • Команда получения сообщений из топика messages:

    <путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-consumer.sh \
      --consumer.config <путь_к_файлу_с_параметрами> \
      --bootstrap-server <FQDN_хоста>:9091 \
      --group test-consumer-group \
      --topic messages \
      --property print.key=true \
      --property key.separator=":"
    
  • Команда отправки сообщения в топик messages:

    echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-producer.bat `
        --producer.config <путь_к_файлу_с_параметрами> `
        --bootstrap-server <FQDN_хоста>:9091 `
        --topic messages `
        --property parse.key=true `
        --property key.separator=":"
    
  • Команда получения сообщений из топика messages:

    <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-consumer.bat `
        --consumer.config <путь_к_файлу_с_параметрами> `
        --bootstrap-server <FQDN_хоста>:9091 `
        --group test-consumer-group `    
        --topic messages `
        --property print.key=true `
        --property key.separator=":"
    

Запустите команду получения сообщенийЗапустите команду получения сообщений

  1. Запустите команду получения сообщений.

  2. Через 10–15 секунд прервите команду, нажав Ctrl + C. Проверьте, что в терминале появилось сообщение:

    Processed a total of 0 messages
    

    Это сообщение означает, что потребитель успешно подключился к топику.

  3. Подтвердите завершение команды.

Теперь группа потребителей test-consumer-group зарегистрирована и может быть использована в качестве метки для метрик.

Создайте графики мониторингаСоздайте графики мониторинга

Используя сервис Yandex Monitoring, отобразите на одном графике метрики kafka_group_topic_partition_offset, kafka_log_Log_LogStartOffset и kafka_log_Log_LogEndOffset:

  • Для kafka_group_topic_partition_offset укажите метки:
    • service = managed-kafka,
    • name = kafka_group_topic_partition_offset,
    • host = <FQDN_хоста>,
    • topic = messages,
    • group = test-consumer-group.
  • Для kafka_log_Log_LogStartOffset укажите метки:
    • service = managed-kafka,
    • name = kafka_log_Log_LogStartOffset,
    • host = <FQDN_хоста>,
    • topic = messages.
  • Для kafka_log_Log_LogEndOffset укажите метки:
    • service = managed-kafka,
    • name = kafka_log_Log_LogEndOffset,
    • host = <FQDN_хоста>,
    • topic = messages.

Важно

Тестовый топик messages содержит только один раздел, поэтому метку partition можно не указывать. Если в вашем решении топик содержит несколько разделов, при построении графиков перечисленных метрик укажите метку partition.

Примечание

Для отслеживания утери сообщений достаточно только метрик kafka_group_topic_partition_offset и kafka_log_Log_LogStartOffset, но дополнительная метрика kafka_log_Log_LogEndOffset сделает график более наглядным.

Проверьте отправку и получение сообщенияПроверьте отправку и получение сообщения

  1. Запустите команду отправки сообщения.

  2. Спустя примерно 3 минуты запустите команду получения сообщений. Проверьте, что в терминале отобразилось сообщение key:test message.

  3. Нажмите Ctrl + C, чтобы прервать команду получения сообщений из топика.

Поскольку удаление сообщений выключено, спустя три минуты сообщение все еще доступно для потребителей.

Включите удаление сообщенийВключите удаление сообщений

Задайте следующие настройки топика messages:

  • Политика очистки лога — Delete;
  • Время жизни сегмента лога, мс — 60000.

Примечание

После изменения настроек топика кластер некоторое время будет обновляться.

Теперь сообщения будут автоматически удаляться через 60 секунд после их записи в топик.

Повторно проверьте отправку и получение сообщенияПовторно проверьте отправку и получение сообщения

  1. Запустите команду отправки сообщения.

  2. Спустя примерно 3 минуты запустите команду получения сообщений. Проверьте, что на этот раз сообщение не было получено.

  3. Нажмите Ctrl + C, чтобы прервать команду получения сообщений из топика.

Спустя 60 секунд после записи в топик все сообщения удаляются, поэтому для группы слишком «медленных» потребителей возникает ситуация утери сообщений.

Проанализируйте графики мониторингаПроанализируйте графики мониторинга

Перейдите в сервис Yandex Monitoring и проанализируйте поведение созданных ранее метрик:

  • kafka_log_Log_LogStartOffset — первое смещение в разделе. Увеличивается при записи сообщений в топик.
  • kafka_log_Log_LogEndOffset — последнее смещение в разделе. Увеличивается при удалении сообщений из топика.
  • kafka_group_topic_partition_offset — текущее смещение группы потребителей в разделе. Увеличивается при вычитывании сообщений из топика группой потребителей.

На графике видны следующие закономерности:

  1. В первый момент времени все три метрики имеют значение 0.
  2. После отправки первого сообщения значение kafka_log_Log_LogEndOffset вырастает до 1.
  3. Через 3 минуты, в момент получения первого сообщения, kafka_group_topic_partition_offset также вырастает до 1. Поскольку сообщения не удаляются, kafka_log_Log_LogStartOffset остается равным 0.
  4. Через минуту после включения удаления сообщений сообщение удаляется из топика, и kafka_log_Log_LogStartOffset принимает значение 1. Теперь все три метрики имеют значение 1.
  5. После отправки второго сообщения значение kafka_log_Log_LogEndOffset вырастает до 2. Через минуту сообщение удаляется из топика, и kafka_log_Log_LogStartOffset также принимает значение 2. При этом kafka_group_topic_partition_offset все еще имеет значение 1.
  6. При втором запуске команды получения сообщений значение kafka_group_topic_partition_offset также вырастает до 2, хотя сообщение не получено.

Выводы:

  1. В норме значение kafka_group_topic_partition_offset находится между значениями kafka_log_Log_LogStartOffset и kafka_log_Log_LogEndOffset. Если kafka_group_topic_partition_offset становится меньше kafka_log_Log_LogStartOffset, это указывает на утерю сообщений этой группой потребителей.
  2. Разница между значениями kafka_log_Log_LogEndOffset и kafka_group_topic_partition_offset показывает, сколько новых сообщений пока не вычитано (т. е. насколько группа потребителей отстает от производителей).

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:

  • Удалите кластер Managed Service for Apache Kafka®.
  • Освободите и удалите публичные статические IP-адреса.

Была ли статья полезна?

Предыдущая
Работа с топиками Apache Kafka® с помощью Yandex Data Processing
Следующая
Взаимосвязь ресурсов сервиса
Проект Яндекса
© 2025 ТОО «Облачные Сервисы Казахстан»