Yandex Cloud
Search
Contact UsGet started
  • Pricing
  • Customer Stories
  • Documentation
  • Blog
  • All Services
  • System Status
    • Featured
    • Infrastructure & Network
    • Data Platform
    • Containers
    • Developer tools
    • Serverless
    • Security
    • Monitoring & Resources
    • AI for business
    • Business tools
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Start testing with double trial credits
    • Cloud credits to scale your IT product
    • Gateway to Russia
    • Cloud for Startups
    • Center for Technologies and Society
    • Yandex Cloud Partner program
  • Pricing
  • Customer Stories
  • Documentation
  • Blog
© 2025 Direct Cursus Technology L.L.C.
Tutorials
    • All tutorials
    • Unassisted deployment of the Apache Kafka® web interface
    • Upgrading a Managed Service for Apache Kafka® cluster to migrate from ZooKeeper to KRaft
    • Migrating a database from a third-party Apache Kafka® cluster to Managed Service for Apache Kafka®
    • Moving data between Managed Service for Apache Kafka® clusters using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for Apache Kafka® using Debezium
    • Delivering data from Managed Service for PostgreSQL to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for PostgreSQL to Managed Service for Apache Kafka® using Debezium
    • Delivering data from Managed Service for YDB to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for ClickHouse® using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Yandex MPP Analytics for PostgreSQL using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Yandex StoreDoc using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for MySQL® using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for OpenSearch using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for PostgreSQL using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for YDB using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Data Streams using Data Transfer
    • Delivering data from Data Streams to Managed Service for YDB using Data Transfer
    • Delivering data from Data Streams to Managed Service for Apache Kafka® using Data Transfer
    • YDB change data capture and delivery to YDS
    • Configuring Kafka Connect to work with a Managed Service for Apache Kafka® cluster
    • Synchronizing Apache Kafka® topics in Object Storage with no web access
    • Monitoring message loss in an Apache Kafka® topic
    • Automating Query tasks with Managed Service for Apache Airflow™
    • Sending requests to the Yandex Cloud API via the Yandex Cloud Python SDK
    • Configuring an SMTP server to send e-mail notifications
    • Adding data to a ClickHouse® DB
    • Migrating data to Managed Service for ClickHouse® using ClickHouse® tools
    • Migrating data to Managed Service for ClickHouse® using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for ClickHouse® using Data Transfer
    • Asynchronously replicating data from PostgreSQL to ClickHouse®
    • Exchanging data between Managed Service for ClickHouse® and Yandex Data Processing
    • Configuring Managed Service for ClickHouse® for Graphite
    • Fetching data from Managed Service for Apache Kafka® to Managed Service for ClickHouse®
    • Fetching data from Managed Service for Apache Kafka® to ksqlDB
    • Fetching data from RabbitMQ to Managed Service for ClickHouse®
    • Saving a data stream from Data Streams to Managed Service for ClickHouse®
    • Asynchronous replication of data from Yandex Metrica to ClickHouse® using Data Transfer
    • Using hybrid storage in Managed Service for ClickHouse®
    • Sharding Managed Service for ClickHouse® tables
    • Loading data from Yandex Direct to a Managed Service for ClickHouse® data mart using Cloud Functions, Object Storage, and Data Transfer
    • Loading data from Object Storage to Managed Service for ClickHouse® using Data Transfer
    • Migrating data with change of storage from Managed Service for OpenSearch to Managed Service for ClickHouse® using Data Transfer
    • Loading data from Managed Service for YDB to Managed Service for ClickHouse® using Data Transfer
    • Yandex Managed Service for ClickHouse® integration with Microsoft SQL Server via ClickHouse® JDBC Bridge
    • Migrating databases from Google BigQuery to Managed Service for ClickHouse®
    • Yandex Managed Service for ClickHouse® integration with Oracle via ClickHouse® JDBC Bridge
    • Configuring Cloud DNS to access a Managed Service for ClickHouse® cluster from other cloud networks
    • Migrating a Yandex Data Processing HDFS cluster to a different availability zone
    • Importing data from Managed Service for MySQL® to Yandex Data Processing using Sqoop
    • Importing data from Managed Service for PostgreSQL to Yandex Data Processing using Sqoop
    • Mounting Object Storage buckets to the file system of Yandex Data Processing hosts
    • Working with Apache Kafka® topics using Yandex Data Processing
    • Automating operations with Yandex Data Processing using Managed Service for Apache Airflow™
    • Shared use of Yandex Data Processing tables through Apache Hive™ Metastore
    • Transferring metadata across Yandex Data Processing clusters using Apache Hive™ Metastore
    • Importing data from Object Storage, processing it, and exporting it to Managed Service for ClickHouse®
    • Migrating collections from a third-party MongoDB cluster to Yandex StoreDoc
    • Migrating data to Yandex StoreDoc
    • Migrating Yandex StoreDoc cluster from 4.4 to 6.0
    • Sharding Yandex StoreDoc collections
    • Yandex StoreDoc performance analysis and tuning
    • Managed Service for MySQL® performance analysis and tuning
    • Syncing data from a third-party MySQL® cluster to Managed Service for MySQL® using Data Transfer
    • Migrating a database from Managed Service for MySQL® to a third-party MySQL® cluster
    • Migrating a database from Managed Service for MySQL® to Object Storage using Data Transfer
    • Migrating data from Object Storage to Managed Service for MySQL® using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for Apache Kafka® using Debezium
    • Migrating a database from Managed Service for MySQL® to Managed Service for YDB using Data Transfer
    • MySQL® change data capture and delivery to YDS
    • Migrating data from Managed Service for MySQL® to Managed Service for PostgreSQL using Data Transfer
    • Migrating data from AWS RDS for PostgreSQL to Managed Service for PostgreSQL using Data Transfer
    • Migrating data from Managed Service for MySQL® to Yandex MPP Analytics for PostgreSQL using Data Transfer
    • Configuring an index policy in Managed Service for OpenSearch
    • Migrating data from a third-party OpenSearch cluster to Managed Service for OpenSearch using Data Transfer
    • Loading data from Managed Service for OpenSearch to Object Storage using Data Transfer
    • Migrating data from Managed Service for OpenSearch to Managed Service for YDB using Data Transfer
    • Copying data from Managed Service for OpenSearch to Yandex MPP Analytics for PostgreSQL using Yandex Data Transfer
    • Migrating data from Managed Service for PostgreSQL to Managed Service for OpenSearch using Data Transfer
    • Authenticating a Managed Service for OpenSearch cluster in OpenSearch Dashboards using Keycloak
    • Using the yandex-lemmer plugin in Managed Service for OpenSearch
    • Creating a PostgreSQL cluster for 1C:Enterprise
    • Searching for the Managed Service for PostgreSQL cluster performance issues
    • Managed Service for PostgreSQL performance analysis and tuning
    • Logical replication in PostgreSQL
    • Migrating a database from a third-party PostgreSQL cluster to Managed Service for PostgreSQL
    • Migrating a database from Managed Service for PostgreSQL
    • Delivering data from Managed Service for PostgreSQL to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for PostgreSQL to Managed Service for Apache Kafka® using Debezium
    • Delivering data from Managed Service for PostgreSQL to Managed Service for YDB using Data Transfer
    • Migrating a database from Managed Service for PostgreSQL to Object Storage
    • Migrating data from Object Storage to Managed Service for PostgreSQL using Data Transfer
    • PostgreSQL change data capture and delivery to YDS
    • Migrating data from Managed Service for PostgreSQL to Managed Service for MySQL® using Data Transfer
    • Migrating data from Managed Service for PostgreSQL to Managed Service for OpenSearch using Data Transfer
    • Fixing string sorting issues in PostgreSQL after upgrading glibc
    • Migrating a database from Greenplum® to ClickHouse®
    • Migrating a database from Greenplum® to PostgreSQL
    • Exporting Greenplum® data to a cold storage in Object Storage
    • Loading data from Object Storage to Yandex MPP Analytics for PostgreSQL using Data Transfer
    • Copying data from Managed Service for OpenSearch to Yandex MPP Analytics for PostgreSQL using Yandex Data Transfer
    • Creating an external table from an Object Storage bucket table using a configuration file
    • Getting data from external sources using named queries in Greenplum®
    • Migrating a database from a third-party Valkey™ cluster to Yandex Managed Service for Valkey™
    • Using a Yandex Managed Service for Valkey™ cluster as a PHP session storage
    • Loading data from Object Storage to Managed Service for YDB using Data Transfer
    • Loading data from Managed Service for YDB to Object Storage using Data Transfer
    • Processing Audit Trails events
    • Processing Cloud Logging logs
    • Processing Debezium CDC streams
    • Analyzing data with Jupyter
    • Processing files with usage details in Yandex Cloud Billing
    • Ingesting data into storage systems
    • Smart log processing
    • Data transfer in microservice architectures
    • Migrating data to Object Storage using Data Transfer
    • Migrating data from a third-party Greenplum® or PostgreSQL cluster to Yandex MPP Analytics for PostgreSQL using Data Transfer
    • Migrating Yandex StoreDoc clusters
    • Migrating MySQL® clusters
    • Migrating to a third-party MySQL® cluster
    • Migrating PostgreSQL clusters
    • Creating a schema registry to deliver data in Debezium CDC format from Apache Kafka®
    • Automating operations using Yandex Managed Service for Apache Airflow™
    • Working with an Object Storage table from a PySpark job
    • Integrating Yandex Managed Service for Apache Spark™ with Apache Hive™ Metastore
    • Running a PySpark job using Yandex Managed Service for Apache Airflow™
    • Using Yandex Object Storage in Yandex Managed Service for Apache Spark™

In this article:

  • Required paid resources
  • Getting started
  • Set up topic connection tools
  • Prepare message send and receive commands
  • Run the message receive command
  • Create monitoring charts
  • Test sending and receiving a message
  • Enable message deletion
  • Test sending and receiving a message once again
  • Review the monitoring charts
  • Delete the resources you created
  1. Building a data platform
  2. Monitoring message loss in an Apache Kafka® topic

Monitoring message loss in an Apache Kafka® topic

Written by
Yandex Cloud
Updated at November 26, 2025
  • Required paid resources
  • Getting started
  • Set up topic connection tools
  • Prepare message send and receive commands
  • Run the message receive command
  • Create monitoring charts
  • Test sending and receiving a message
  • Enable message deletion
  • Test sending and receiving a message once again
  • Review the monitoring charts
  • Delete the resources you created

A consumer group may lose messages in an Apache Kafka® topic when both of the following apply:

  1. The topic or entire cluster uses the Delete log cleanup policy with a short Log retention period.
  2. One or more consumer groups are too slow to read messages from the topic resulting in deletion of unread messages.

You can monitor message loss using Managed Service for Apache Kafka® metrics delivered to Monitoring. If the kafka_group_topic_partition_offset value falls below kafka_log_Log_LogStartOffset, a consumer group has lost messages.

In this tutorial, you will:

  • Simulate message loss in a Managed Service for Apache Kafka® test cluster topic using topic connection tools.
  • Create a chart for the kafka_group_topic_partition_offset, kafka_log_Log_LogStartOffset, and kafka_log_Log_LogEndOffset metrics in Yandex Monitoring and observe the patterns during the message loss.

To simulate and monitor message loss in an Apache Kafka® topic:

  1. Set up topic connection tools.
  2. Prepare message send and receive commands.
  3. Run the message receive command.
  4. Build monitoring charts.
  5. Test sending and receiving a message.
  6. Enable message deletion.
  7. Test sending and receiving a message once again.
  8. Review the monitoring charts.

If you no longer need the resources you created, delete them.

Required paid resourcesRequired paid resources

The support cost for this solution includes:

  • Apache Kafka® cluster fee: use of computing resources allocated to hosts (including ZooKeeper hosts) and disk storage (see Apache Kafka® pricing).
  • Fee for public IP addresses assigned to cluster hosts (see Virtual Private Cloud pricing).

Getting startedGetting started

  1. Create a Managed Service for Apache Kafka® cluster with any suitable configuration. Enable Public access when creating your cluster.

  2. Create a topic for message exchange between producer and consumer with the following settings:

    • Name: messages
    • Number of partitions: 1
  3. Create a user named user and grant them permissions for the messages topic:

    • ACCESS_ROLE_CONSUMER
    • ACCESS_ROLE_PRODUCER
  4. Configure security groups.

  5. Get an SSL certificate.

    Note

    You can connect to public hosts only if using an SSL certificate.

  6. Select a cluster host with the KAFKA role and get its FQDN.

  7. Name the consumer group, e.g., test-consumer-group.

Set up topic connection toolsSet up topic connection tools

CLI for Bash
CLI for PowerShell
  1. Install OpenJDK:

    sudo apt update && sudo apt install --yes default-jdk
    
  2. Download the archive with binary files for the Apache Kafka® version run by the cluster. Your Scala version is irrelevant.

  3. Unpack the archive.

  4. Go to the folder where the Java certificate store will be located:

    cd /etc/security
    
  5. Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set a password of at least 6 characters using the -storepass parameter for additional storage protection:

    sudo keytool -importcert \
                 -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
                 -keystore ssl -storepass <certificate_store_password> \
                 --noprompt
    
  6. Create a configuration file for connecting to the cluster:

    Note

    In this tutorial, we will use a single user as both producer and consumer to keep things simple. Therefore, you only need to create one configuration file to both send and receive messages.

    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="user" \
      password="<user_password>";
    security.protocol=SASL_SSL
    ssl.truststore.location=/etc/security/ssl
    ssl.truststore.password=<certificate_store_password>
    
  1. Install the latest available version of Microsoft OpenJDK.

  2. Download the archive with binary files for the Apache Kafka® version run by the cluster. Your Scala version is irrelevant.

  3. Unpack the archive.

    Tip

    Unpack the Apache Kafka® files to the disk's root folder, e.g., C:\kafka_2.12-2.6.0\.

    If the path to the Apache Kafka® executables and batch files is too long, you will get the The input line is too long error when trying to run them.

  4. Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set a password in the --storepass parameter for additional storage protection:

    keytool.exe -importcert -alias YandexCA `
    --file $HOME\.kafka\YandexInternalRootCA.crt `
    --keystore $HOME\.kafka\ssl `
    --storepass <certificate_store_password> `
    --noprompt
    
  5. Create a configuration file for connecting to the cluster:

    Note

    In this tutorial, we will use a single user as both producer and consumer to keep things simple. Therefore, you only need to create one configuration file to both send and receive messages.

    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="user" \
      password="<user_password>";
    security.protocol=SASL_SSL
    ssl.truststore.location=<$HOME_variable_value>\\.kafka\\ssl
    ssl.truststore.password=<certificate_store_password>
    

    Specify the full path to the certificate store as the ssl.truststore.location parameter value, for example:

    ssl.truststore.location=C:\\Users\\Administrator\\.kafka\\ssl
    

    The certificate store is located at $HOME\.kafka\ssl, but you cannot use environment variables in the value. To expand the variable, run this command:

    echo $HOME
    

    Warning

    Use \\ instead of \ when specifying the ssl.truststore.location parameter value, otherwise you will not be able to access the certificate store when running commands.

Prepare message send and receive commandsPrepare message send and receive commands

CLI for Bash
CLI for PowerShell
  • Command to send a message to the messages topic:

    echo "key:test message" | <path_to_folder_with_Apache_Kafka_files>/bin/kafka-console-producer.sh \
      --producer.config <path_to_configuration_file> \
      --bootstrap-server <host_FQDN>:9091 \
      --topic messages \
      --property parse.key=true \
      --property key.separator=":"
    
  • Command to receive a message from the messages topic:

    <path_to_folder_with_Apache_Kafka_files>/bin/kafka-console-consumer.sh \
      --consumer.config <path_to_configuration_file> \
      --bootstrap-server <host_FQDN>:9091 \
      --group test-consumer-group \
      --topic messages \
      --property print.key=true \
      --property key.separator=":"
    
  • Command to send a message to the messages topic:

    echo "key:test message" | <path_to_folder_with_Apache_Kafka_files>\bin\windows\kafka-console-producer.bat `
        --producer.config <path_to_configuration_file> `
        --bootstrap-server <host_FQDN>:9091 `
        --topic messages `
        --property parse.key=true `
        --property key.separator=":"
    
  • Command to receive a message from the messages topic:

    <path_to_folder_with_Apache_Kafka_files>\bin\windows\kafka-console-consumer.bat `
        --consumer.config <path_to_configuration_file> `
        --bootstrap-server <host_FQDN>:9091 `
        --group test-consumer-group `    
        --topic messages `
        --property print.key=true `
        --property key.separator=":"
    

Run the message receive commandRun the message receive command

  1. Run the message receive command.

  2. Wait 10 to 15 seconds and then interrupt the command by pressing Ctrl + C. Make sure the terminal displays the following message:

    Processed a total of 0 messages
    

    This message means the consumer has successfully connected to the topic.

  3. Confirm completing the command.

The consumer group named test-consumer-group has now been registered and can be used as a label for metrics.

Create monitoring chartsCreate monitoring charts

Use Yandex Monitoring to plot the kafka_group_topic_partition_offset, kafka_log_Log_LogStartOffset, and kafka_log_Log_LogEndOffset metrics on the same chart:

  • For kafka_group_topic_partition_offset, specify these labels:
    • service = managed-kafka
    • name = kafka_group_topic_partition_offset
    • host = <host_FQDN>
    • topic = messages
    • group = test-consumer-group
  • For kafka_log_Log_LogStartOffset, specify these labels:
    • service = managed-kafka
    • name = kafka_log_Log_LogStartOffset
    • host = <host_FQDN>
    • topic = messages
  • For kafka_log_Log_LogEndOffset, specify these labels:
    • service = managed-kafka
    • name = kafka_log_Log_LogEndOffset
    • host = <host_FQDN>
    • topic = messages

Warning

In our example, the messages test topic includes only one partition, so you do not need to specify the partition label. If the topic in your solution has multiple partitions, specify partition when plotting the listed metrics.

Note

To monitor message loss, you only need the kafka_group_topic_partition_offset and kafka_log_Log_LogStartOffset metrics; however, the additional kafka_log_Log_LogEndOffset metric will make the chart more informative.

Test sending and receiving a messageTest sending and receiving a message

  1. Run the message send command.

  2. After about three minutes, run the message receive command. Make sure the terminal displays key:test message.

  3. Press Ctrl + C to interrupt the command for receiving messages from the topic.

Message deletion being disabled, three minutes later the message is still available to consumers.

Enable message deletionEnable message deletion

Configure the messages topic as follows:

  • Cleanup policy: Delete
  • Retention, ms: 60000

Note

After you change the topic settings, the cluster will take some time to update.

Messages will now be automatically deleted 60 seconds after being written to the topic.

Test sending and receiving a message once againTest sending and receiving a message once again

  1. Run the message send command.

  2. After about three minutes, run the message receive command. Make sure the message has not been received this time.

  3. Press Ctrl + C to interrupt the command for receiving messages from the topic.

All messages are deleted 60 seconds after they are written to the topic, leading to message loss for the "slow" consumer group.

Review the monitoring chartsReview the monitoring charts

Navigate to Yandex Monitoring and review the behavior of the previously created metrics:

  • kafka_log_Log_LogStartOffset: Partition's first offset. Increases as messages are getting written to the topic.
  • kafka_log_Log_LogEndOffset: Partition's last offset. Increases as messages get deleted from the topic.
  • kafka_group_topic_partition_offset: Partition's current consumer group offset. Increases as the consumer group is reading messages from the topic.

The chart visualizes the following patterns:

  1. At the starting point, all the three metrics are at 0.
  2. After the first message is sent, kafka_log_Log_LogEndOffset increases to 1.
  3. Three minutes later, as soon as the first message arrives, kafka_group_topic_partition_offset also increases to 1. As messages do not get deleted, kafka_log_Log_LogStartOffset remains equal to 0.
  4. One minute after you enable message deletion, the system deletes the message from the topic and kafka_log_Log_LogStartOffset gets the value of 1. Now, all the three metrics are at 1.
  5. After the second message is sent, kafka_log_Log_LogEndOffset increases to 2. One minute later, the system deletes the message from the topic and kafka_log_Log_LogStartOffset also gets the value of 2. kafka_group_topic_partition_offset still equals 1.
  6. As soon as you run the message receive command for a second time, kafka_group_topic_partition_offset also increases to 2, even though the message has not been received.

Summary:

  1. The kafka_group_topic_partition_offset value normally lies between kafka_log_Log_LogStartOffset and kafka_log_Log_LogEndOffset. kafka_group_topic_partition_offset falling below kafka_log_Log_LogStartOffset signals that this consumer group has lost messages.
  2. The difference between kafka_log_Log_LogEndOffset and kafka_group_topic_partition_offset shows how many of the new messages have not yet been read, i.e., the consumer group's lag behind the producers.

Delete the resources you createdDelete the resources you created

Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them:

  • Delete the Managed Service for Apache Kafka® cluster.
  • Release and delete the static public IP addresses.

Was the article helpful?

Previous
Using Confluent Schema Registry with Managed Service for Apache Kafka®
Next
Automating Query tasks with Managed Service for Apache Airflow™
© 2025 Direct Cursus Technology L.L.C.