Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Истории успеха
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Доступны в регионе
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • ИИ для бизнеса
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Партнёрская программа
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Истории успеха
  • Документация
  • Блог
Проект Яндекса
© 2025 ТОО «Облачные Сервисы Казахстан»
Практические руководства
    • Все руководства
    • Самостоятельное развертывание веб-интерфейса Apache Kafka®
    • Обновление кластера Managed Service for Apache Kafka® с ZooKeeper на KRaft
    • Миграция БД из стороннего кластера Apache Kafka® в Managed Service for Apache Kafka®
    • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for YDB в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Yandex MPP Analytics for PostgreSQL с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Yandex StoreDoc с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for MySQL® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for OpenSearch с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for PostgreSQL с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Data Streams с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for Apache Kafka® с помощью Data Transfer
    • Захват изменений YDB и поставка в YDS
    • Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®
    • Синхронизация топиков Apache Kafka® в Object Storage без использования интернета
    • Отслеживание потери сообщений в топике Apache Kafka®
    • Автоматизация задач Query с помощью Managed Service for Apache Airflow™
    • Отправка запросов к API Yandex Cloud через Yandex Cloud Python SDK
    • Настройка SMTP-сервера для отправки уведомлений по электронной почте
    • Добавление данных в БД ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® средствами ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® при помощи Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for ClickHouse® с помощью Data Transfer
    • Асинхронная репликация данных из PostgreSQL в ClickHouse®
    • Обмен данными между Managed Service for ClickHouse® и Yandex Data Processing
    • Настройка Managed Service for ClickHouse® для Graphite
    • Получение данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®
    • Получение данных из Managed Service for Apache Kafka® в ksqlDB
    • Получение данных из RabbitMQ в Managed Service for ClickHouse®
    • Сохранение потока данных Data Streams в Managed Service for ClickHouse®
    • Асинхронная репликация данных из Яндекс Метрика в ClickHouse® с помощью Data Transfer
    • Использование гибридного хранилища в Managed Service for ClickHouse®
    • Шардирование таблиц Managed Service for ClickHouse®
    • Загрузка данных из Яндекс Директ в витрину Managed Service for ClickHouse® с использованием Cloud Functions, Object Storage и Data Transfer
    • Загрузка данных из Object Storage в Managed Service for ClickHouse® с помощью Data Transfer
    • Миграция данных со сменой хранилища из Managed Service for OpenSearch в Managed Service for ClickHouse® с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Managed Service for ClickHouse® с помощью Data Transfer
    • Интеграция Yandex Managed Service for ClickHouse® с Microsoft SQL Server через ClickHouse® JDBC Bridge
    • Миграция базы данных из Google BigQuery в Managed Service for ClickHouse®
    • Интеграция Yandex Managed Service for ClickHouse® с Oracle через ClickHouse® JDBC Bridge
    • Настройка Cloud DNS для доступа к кластеру Managed Service for ClickHouse® из других облачных сетей
    • Миграция кластера Yandex Data Processing с HDFS в другую зону доступности
    • Импорт данных из Managed Service for MySQL® в Yandex Data Processing с помощью Sqoop
    • Импорт данных из Managed Service for PostgreSQL в Yandex Data Processing с помощью Sqoop
    • Монтирование бакетов Object Storage к файловой системе хостов Yandex Data Processing
    • Работа с топиками Apache Kafka® с помощью Yandex Data Processing
    • Автоматизация работы с Yandex Data Processing с помощью Managed Service for Apache Airflow™
    • Совместная работа с таблицами Yandex Data Processing с использованием Apache Hive™ Metastore
    • Перенос метаданных между кластерами Yandex Data Processing с помощью Apache Hive™ Metastore
    • Импорт данных из Object Storage, обработка и экспорт в Managed Service for ClickHouse®
    • Миграция коллекций из стороннего кластера MongoDB в Yandex StoreDoc
    • Миграция данных в Yandex StoreDoc
    • Миграция кластера Yandex StoreDoc с версии 4.4 на 6.0
    • Шардирование коллекций Yandex StoreDoc
    • Анализ производительности и оптимизация Yandex StoreDoc
    • Анализ производительности и оптимизация Managed Service for MySQL®
    • Синхронизация данных из стороннего кластера MySQL® в Managed Service for MySQL® с помощью Data Transfer
    • Миграция БД из Managed Service for MySQL® в сторонний кластер MySQL®
    • Миграция БД из Managed Service for MySQL® в Object Storage с помощью Data Transfer
    • Перенос данных из Object Storage в Managed Service for MySQL® с использованием Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Миграция БД из Managed Service for MySQL® в Managed Service for YDB с помощью Data Transfer
    • Захват изменений MySQL® и поставка в YDS
    • Миграция данных из Managed Service for MySQL® в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из AWS RDS for PostgreSQL в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из Managed Service for MySQL® в Yandex MPP Analytics for PostgreSQL с помощью Data Transfer
    • Настройка политики индексов в Managed Service for OpenSearch
    • Миграция данных в Managed Service for OpenSearch из стороннего кластера OpenSearch с помощью Data Transfer
    • Загрузка данных из Managed Service for OpenSearch в Object Storage с помощью Data Transfer
    • Миграция данных из Managed Service for OpenSearch в Managed Service for YDB с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Yandex MPP Analytics for PostgreSQL с помощью Yandex Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Аутентификация в OpenSearch Dashboards кластера Managed Service for OpenSearch с помощью Keycloak
    • Использование плагина yandex-lemmer в Managed Service for OpenSearch
    • Создание кластера PostgreSQL для «1С:Предприятия»
    • Логическая репликация PostgreSQL
    • Миграция БД из стороннего кластера PostgreSQL в Managed Service for PostgreSQL
    • Миграция БД из Managed Service for PostgreSQL
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for YDB с помощью Data Transfer
    • Миграция БД из Managed Service for PostgreSQL в Object Storage
    • Перенос данных из Object Storage в Managed Service for PostgreSQL с использованием Data Transfer
    • Захват изменений PostgreSQL и поставка в YDS
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for MySQL® с помощью Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Решение проблем с сортировкой строк в PostgreSQL после обновления glibc
    • Миграция БД из Greenplum® в ClickHouse®
    • Миграция БД из Greenplum® в PostgreSQL
    • Выгрузка данных Greenplum® в холодное хранилище Object Storage
    • Загрузка данных из Object Storage в Yandex MPP Analytics for PostgreSQL с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Yandex MPP Analytics for PostgreSQL с помощью Yandex Data Transfer
    • Создание внешней таблицы на базе таблицы из бакета Object Storage с помощью конфигурационного файла
    • Получение данных из внешних источников с помощью именованных запросов в Greenplum®
    • Миграция БД из стороннего кластера Valkey™ в Yandex Managed Service for Valkey™
    • Использование кластера Yandex Managed Service for Valkey™ в качестве хранилища сессий PHP
    • Загрузка данных из Object Storage в Managed Service for YDB с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Object Storage с помощью Data Transfer
    • Обработка аудитных логов Audit Trails
    • Обработка логов Cloud Logging
    • Обработка потока изменений Debezium
    • Анализ данных с помощью Jupyter
    • Обработка файлов детализации в сервисе Yandex Cloud Billing
    • Ввод данных в системы хранения
    • Умная обработка логов
    • Передача данных в микросервисных архитектурах
    • Миграция данных в Object Storage с помощью Data Transfer
    • Миграция данных из стороннего кластера Greenplum® или PostgreSQL в Yandex MPP Analytics for PostgreSQL с помощью Data Transfer
    • Миграция кластера Yandex StoreDoc
    • Миграция кластера MySQL®
    • Миграция на сторонний кластер MySQL®
    • Миграция кластера PostgreSQL
    • Создание реестра схем для поставки данных в формате Debezium CDC из Apache Kafka®
    • Автоматизация работы с помощью Yandex Managed Service for Apache Airflow™
    • Работа с таблицей в Object Storage из PySpark-задания
    • Интеграция Yandex Managed Service for Apache Spark™ и Apache Hive™ Metastore
    • Запуск PySpark-задания с помощью Yandex Managed Service for Apache Airflow™
    • Использование Yandex Object Storage в Yandex Managed Service for Apache Spark™

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте инструменты для подключения к топику
  • Подготовьте команды для отправки и получения сообщений
  • Запустите команду получения сообщений
  • Создайте графики мониторинга
  • Проверьте отправку и получение сообщения
  • Включите удаление сообщений
  • Повторно проверьте отправку и получение сообщения
  • Проанализируйте графики мониторинга
  • Удалите созданные ресурсы
  1. Построение Data Platform
  2. Отслеживание потери сообщений в топике Apache Kafka®

Отслеживание потери сообщений в топике Apache Kafka®

Статья создана
Yandex Cloud
Обновлена 7 ноября 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте инструменты для подключения к топику
  • Подготовьте команды для отправки и получения сообщений
  • Запустите команду получения сообщений
  • Создайте графики мониторинга
  • Проверьте отправку и получение сообщения
  • Включите удаление сообщений
  • Повторно проверьте отправку и получение сообщения
  • Проанализируйте графики мониторинга
  • Удалите созданные ресурсы

Потеря сообщений в топике Apache Kafka® группа потребителей возникает из-за сочетания двух факторов:

  1. В этом топике или во всем кластере включена политика очистки лога Delete и задано малое время жизни сегмента лога Log retention.
  2. Одна или несколько групп потребителей недостаточно быстро вычитывают сообщения из топика. В результате могут быть удалены даже те сообщения, которые еще не были прочитаны.

Потерю сообщений можно отслеживать с помощью метрик сервиса Managed Service for Apache Kafka®, поставляемых в Monitoring. Если значение kafka_group_topic_partition_offset становится меньше kafka_log_Log_LogStartOffset, это указывает на потерю сообщений группой потребителей.

В этом руководстве вы:

  • Смоделируете потерю сообщений в топике на тестовом кластере Managed Service for Apache Kafka®, используя инструменты подключения к топику.
  • Построите график метрик kafka_group_topic_partition_offset, kafka_log_Log_LogStartOffset и kafka_log_Log_LogEndOffset с помощью сервиса Yandex Monitoring, а также проследите закономерности, возникающие при потере сообщений.

Чтобы смоделировать и отследить потерю сообщений в топике Apache Kafka®:

  1. Подготовьте инструменты для подключения к топику.
  2. Подготовьте команды для отправки и получения сообщений.
  3. Запустите команду получения сообщений.
  4. Создайте графики мониторинга.
  5. Проверьте отправку и получение сообщения.
  6. Включите удаление сообщений.
  7. Повторно проверьте отправку и получение сообщения.
  8. Проанализируйте графики мониторинга.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за использование публичных IP-адресов для хостов кластеров (см. тарифы Virtual Private Cloud).

Перед началом работыПеред началом работы

  1. Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации. При создании кластера включите опцию Публичный доступ.

  2. Создайте топик для обмена сообщениями между производителем и потребителем со следующими параметрами:

    • Имя — messages;
    • Количество разделов — 1.
  3. Создайте пользователя с именем user и выдайте ему права на топик messages:

    • ACCESS_ROLE_CONSUMER,
    • ACCESS_ROLE_PRODUCER.
  4. Настройте группы безопасности.

  5. Получите SSL-сертификат.

    Примечание

    Подключение к хостам в публичном доступе возможно только с использованием SSL-сертификата.

  6. Выберите один из хостов кластера с ролью KAFKA и получите его FQDN.

  7. Выберите имя для группы потребителей, например test-consumer-group.

Подготовьте инструменты для подключения к топикуПодготовьте инструменты для подключения к топику

CLI для Bash
CLI для PowerShell
  1. Установите OpenJDK:

    sudo apt update && sudo apt install --yes default-jdk
    
  2. Загрузите архив с бинарными файлами для версии Apache Kafka®, которая используется в кластере. Версия Scala не важна.

  3. Распакуйте архив.

  4. Перейдите в каталог, где будет располагаться хранилище сертификатов Java:

    cd /etc/security
    
  5. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль не короче 6 символов в параметре -storepass для дополнительной защиты хранилища:

    sudo keytool -importcert \
                 -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
                 -keystore ssl -storepass <пароль_хранилища_сертификатов> \
                 --noprompt
    
  6. Создайте файл с параметрами для подключения к кластеру.

    Примечание

    В этом руководстве для простоты используется единственный пользователь, являющийся и производителем, и потребителем. Поэтому достаточно создать только один файл параметров, который будет использоваться и при отправке, и при получении сообщений.

    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="user" \
      password="<пароль_пользователя_user>";
    security.protocol=SASL_SSL
    ssl.truststore.location=/etc/security/ssl
    ssl.truststore.password=<пароль_хранилища_сертификатов>
    
  1. Установите последнюю доступную версию Microsoft OpenJDK.

  2. Загрузите архив с бинарными файлами для версии Apache Kafka®, которая используется в кластере. Версия Scala не важна.

  3. Распакуйте архив.

    Совет

    Распаковывайте файлы Apache Kafka® в корневой каталог диска, например, C:\kafka_2.12-2.6.0\.

    Если путь к исполняемым и пакетным файлам Apache Kafka® будет слишком длинным, то при попытке запустить их возникнет ошибка The input line is too long.

  4. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль в параметре --storepass для дополнительной защиты хранилища:

    keytool.exe -importcert -alias YandexCA `
    --file $HOME\.kafka\YandexInternalRootCA.crt `
    --keystore $HOME\.kafka\ssl `
    --storepass <пароль_хранилища_сертификатов> `
    --noprompt
    
  5. Создайте файл с параметрами для подключения к кластеру.

    Примечание

    В этом руководстве для простоты используется единственный пользователь, являющийся и производителем, и потребителем. Поэтому достаточно создать только один файл параметров, который будет использоваться и при отправке, и при получении сообщений.

    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="user" \
      password="<пароль_пользователя_user>";
    security.protocol=SASL_SSL
    ssl.truststore.location=<значение_переменной_$HOME>\\.kafka\\ssl
    ssl.truststore.password=<пароль_хранилища_сертификатов>
    

    В качестве значения параметра ssl.truststore.location укажите полный путь к хранилищу сертификатов, например:

    ssl.truststore.location=C:\\Users\\Administrator\\.kafka\\ssl
    

    Хранилище сертификатов расположено по пути $HOME\.kafka\ssl, но в значении нельзя использовать переменные среды окружения. Чтобы раскрыть переменную, выполните команду:

    echo $HOME
    

    Важно

    Используйте \\ вместо \ при указании значения параметра ssl.truststore.location, иначе при запуске команд не удастся получить доступ к хранилищу сертификатов.

Подготовьте команды для отправки и получения сообщенийПодготовьте команды для отправки и получения сообщений

CLI для Bash
CLI для PowerShell
  • Команда отправки сообщения в топик messages:

    echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-producer.sh \
      --producer.config <путь_к_файлу_с_параметрами> \
      --bootstrap-server <FQDN_хоста>:9091 \
      --topic messages \
      --property parse.key=true \
      --property key.separator=":"
    
  • Команда получения сообщений из топика messages:

    <путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-consumer.sh \
      --consumer.config <путь_к_файлу_с_параметрами> \
      --bootstrap-server <FQDN_хоста>:9091 \
      --group test-consumer-group \
      --topic messages \
      --property print.key=true \
      --property key.separator=":"
    
  • Команда отправки сообщения в топик messages:

    echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-producer.bat `
        --producer.config <путь_к_файлу_с_параметрами> `
        --bootstrap-server <FQDN_хоста>:9091 `
        --topic messages `
        --property parse.key=true `
        --property key.separator=":"
    
  • Команда получения сообщений из топика messages:

    <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-consumer.bat `
        --consumer.config <путь_к_файлу_с_параметрами> `
        --bootstrap-server <FQDN_хоста>:9091 `
        --group test-consumer-group `    
        --topic messages `
        --property print.key=true `
        --property key.separator=":"
    

Запустите команду получения сообщенийЗапустите команду получения сообщений

  1. Запустите команду получения сообщений.

  2. Через 10–15 секунд прервите команду, нажав Ctrl + C. Проверьте, что в терминале появилось сообщение:

    Processed a total of 0 messages
    

    Это сообщение означает, что потребитель успешно подключился к топику.

  3. Подтвердите завершение команды.

Теперь группа потребителей test-consumer-group зарегистрирована и может быть использована в качестве метки для метрик.

Создайте графики мониторингаСоздайте графики мониторинга

Используя сервис Yandex Monitoring, отобразите на одном графике метрики kafka_group_topic_partition_offset, kafka_log_Log_LogStartOffset и kafka_log_Log_LogEndOffset:

  • Для kafka_group_topic_partition_offset укажите метки:
    • service = managed-kafka,
    • name = kafka_group_topic_partition_offset,
    • host = <FQDN_хоста>,
    • topic = messages,
    • group = test-consumer-group.
  • Для kafka_log_Log_LogStartOffset укажите метки:
    • service = managed-kafka,
    • name = kafka_log_Log_LogStartOffset,
    • host = <FQDN_хоста>,
    • topic = messages.
  • Для kafka_log_Log_LogEndOffset укажите метки:
    • service = managed-kafka,
    • name = kafka_log_Log_LogEndOffset,
    • host = <FQDN_хоста>,
    • topic = messages.

Важно

Тестовый топик messages содержит только один раздел, поэтому метку partition можно не указывать. Если в вашем решении топик содержит несколько разделов, при построении графиков перечисленных метрик укажите метку partition.

Примечание

Для отслеживания потери сообщений достаточно только метрик kafka_group_topic_partition_offset и kafka_log_Log_LogStartOffset, но дополнительная метрика kafka_log_Log_LogEndOffset сделает график более наглядным.

Проверьте отправку и получение сообщенияПроверьте отправку и получение сообщения

  1. Запустите команду отправки сообщения.

  2. Спустя примерно 3 минуты запустите команду получения сообщений. Проверьте, что в терминале отобразилось сообщение key:test message.

  3. Нажмите Ctrl + C, чтобы прервать команду получения сообщений из топика.

Поскольку удаление сообщений выключено, спустя три минуты сообщение все еще доступно для потребителей.

Включите удаление сообщенийВключите удаление сообщений

Задайте следующие настройки топика messages:

  • Политика очистки лога — Delete;
  • Время жизни сегмента лога, мс — 60000.

Примечание

После изменения настроек топика кластер некоторое время будет обновляться.

Теперь сообщения будут автоматически удаляться через 60 секунд после их записи в топик.

Повторно проверьте отправку и получение сообщенияПовторно проверьте отправку и получение сообщения

  1. Запустите команду отправки сообщения.

  2. Спустя примерно 3 минуты запустите команду получения сообщений. Проверьте, что на этот раз сообщение не было получено.

  3. Нажмите Ctrl + C, чтобы прервать команду получения сообщений из топика.

Спустя 60 секунд после записи в топик все сообщения удаляются, поэтому для группы слишком «медленных» потребителей возникает ситуация потери сообщений.

Проанализируйте графики мониторингаПроанализируйте графики мониторинга

Перейдите в сервис Yandex Monitoring и проанализируйте поведение созданных ранее метрик:

  • kafka_log_Log_LogStartOffset — первое смещение в разделе. Увеличивается при записи сообщений в топик.
  • kafka_log_Log_LogEndOffset — последнее смещение в разделе. Увеличивается при удалении сообщений из топика.
  • kafka_group_topic_partition_offset — текущее смещение группы потребителей в разделе. Увеличивается при вычитывании сообщений из топика группой потребителей.

На графике видны следующие закономерности:

  1. В первый момент времени все три метрики имеют значение 0.
  2. После отправки первого сообщения значение kafka_log_Log_LogEndOffset вырастает до 1.
  3. Через 3 минуты, в момент получения первого сообщения, kafka_group_topic_partition_offset также вырастает до 1. Поскольку сообщения не удаляются, kafka_log_Log_LogStartOffset остается равным 0.
  4. Через минуту после включения удаления сообщений сообщение удаляется из топика, и kafka_log_Log_LogStartOffset принимает значение 1. Теперь все три метрики имеют значение 1.
  5. После отправки второго сообщения значение kafka_log_Log_LogEndOffset вырастает до 2. Через минуту сообщение удаляется из топика, и kafka_log_Log_LogStartOffset также принимает значение 2. При этом kafka_group_topic_partition_offset все еще имеет значение 1.
  6. При втором запуске команды получения сообщений значение kafka_group_topic_partition_offset также вырастает до 2, хотя сообщение не получено.

Выводы:

  1. В норме значение kafka_group_topic_partition_offset находится между значениями kafka_log_Log_LogStartOffset и kafka_log_Log_LogEndOffset. Если kafka_group_topic_partition_offset становится меньше kafka_log_Log_LogStartOffset, это указывает на потерю сообщений этой группой потребителей.
  2. Разница между значениями kafka_log_Log_LogEndOffset и kafka_group_topic_partition_offset показывает, сколько новых сообщений пока не вычитано (т. е. насколько группа потребителей отстает от производителей).

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:

  • Удалите кластер Managed Service for Apache Kafka®.
  • Освободите и удалите публичные статические IP-адреса.

Была ли статья полезна?

Предыдущая
Использование Confluent Schema Registry с Managed Service for Apache Kafka®
Следующая
Автоматизация задач Query с помощью Managed Service for Apache Airflow™
Проект Яндекса
© 2025 ТОО «Облачные Сервисы Казахстан»