Monitoring message loss in an Apache Kafka® topic
- Required paid resources
- Getting started
- Set up topic connection tools
- Prepare message send and receive commands
- Run the message receive command
- Create monitoring charts
- Test sending and receiving a message
- Enable message deletion
- Test sending and receiving a message once again
- Review the monitoring charts
- Delete the resources you created
A consumer group may lose messages in an Apache Kafka® topic when both of the following apply:
- The topic or entire cluster uses the
Deletelog cleanup policy with a shortLog retentionperiod. - One or more consumer groups are too slow to read messages from the topic resulting in deletion of unread messages.
You can monitor message loss using Managed Service for Apache Kafka® metrics delivered to Monitoring. If the kafka_group_topic_partition_offset value falls below kafka_log_Log_LogStartOffset, a consumer group has lost messages.
In this tutorial, you will:
- Simulate message loss in a Managed Service for Apache Kafka® test cluster topic using topic connection tools
. - Create a chart for the
kafka_group_topic_partition_offset,kafka_log_Log_LogStartOffset, andkafka_log_Log_LogEndOffsetmetrics in Yandex Monitoring and observe the patterns during the message loss.
To simulate and monitor message loss in an Apache Kafka® topic:
- Set up topic connection tools.
- Prepare message send and receive commands.
- Run the message receive command.
- Build monitoring charts.
- Test sending and receiving a message.
- Enable message deletion.
- Test sending and receiving a message once again.
- Review the monitoring charts.
If you no longer need the resources you created, delete them.
Required paid resources
The support cost for this solution includes:
- Apache Kafka® cluster fee: use of computing resources allocated to hosts (including ZooKeeper hosts) and disk storage (see Apache Kafka® pricing).
- Fee for public IP addresses assigned to cluster hosts (see Virtual Private Cloud pricing).
Getting started
-
Create a Managed Service for Apache Kafka® cluster with any suitable configuration. Enable Public access when creating your cluster.
-
Create a topic for message exchange between producer and consumer with the following settings:
- Name:
messages - Number of partitions:
1
- Name:
-
Create a user named
userand grant them permissions for themessagestopic:ACCESS_ROLE_CONSUMERACCESS_ROLE_PRODUCER
-
Note
You can connect to public hosts only if using an SSL certificate.
-
Select a cluster host with the
KAFKArole and get its FQDN. -
Name the consumer group, e.g.,
test-consumer-group.
Set up topic connection tools
-
Install OpenJDK:
sudo apt update && sudo apt install --yes default-jdk -
Download the archive with binary files
for the Apache Kafka® version run by the cluster. Your Scala version is irrelevant. -
Unpack the archive.
-
Go to the folder where the Java certificate store will be located:
cd /etc/security -
Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set a password of at least 6 characters using the
-storepassparameter for additional storage protection:sudo keytool -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore ssl -storepass <certificate_store_password> \ --noprompt -
Create a configuration file for connecting to the cluster:
Note
In this tutorial, we will use a single user as both producer and consumer to keep things simple. Therefore, you only need to create one configuration file to both send and receive messages.
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="user" \ password="<user_password>"; security.protocol=SASL_SSL ssl.truststore.location=/etc/security/ssl ssl.truststore.password=<certificate_store_password>
-
Install the latest available version of Microsoft OpenJDK
. -
Download the archive with binary files
for the Apache Kafka® version run by the cluster. Your Scala version is irrelevant. -
Unpack the archive.
Tip
Unpack the Apache Kafka® files to the disk's root folder, e.g.,
C:\kafka_2.12-2.6.0\.If the path to the Apache Kafka® executables and batch files is too long, you will get the
The input line is too longerror when trying to run them. -
Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set a password in the
--storepassparameter for additional storage protection:keytool.exe -importcert -alias YandexCA ` --file $HOME\.kafka\YandexInternalRootCA.crt ` --keystore $HOME\.kafka\ssl ` --storepass <certificate_store_password> ` --noprompt -
Create a configuration file for connecting to the cluster:
Note
In this tutorial, we will use a single user as both producer and consumer to keep things simple. Therefore, you only need to create one configuration file to both send and receive messages.
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="user" \ password="<user_password>"; security.protocol=SASL_SSL ssl.truststore.location=<$HOME_variable_value>\\.kafka\\ssl ssl.truststore.password=<certificate_store_password>Specify the full path to the certificate store as the
ssl.truststore.locationparameter value, for example:ssl.truststore.location=C:\\Users\\Administrator\\.kafka\\sslThe certificate store is located at
$HOME\.kafka\ssl, but you cannot use environment variables in the value. To expand the variable, run this command:echo $HOMEWarning
Use
\\instead of\when specifying thessl.truststore.locationparameter value, otherwise you will not be able to access the certificate store when running commands.
Prepare message send and receive commands
-
Command to send a message to the
messagestopic:echo "key:test message" | <path_to_folder_with_Apache_Kafka_files>/bin/kafka-console-producer.sh \ --producer.config <path_to_configuration_file> \ --bootstrap-server <host_FQDN>:9091 \ --topic messages \ --property parse.key=true \ --property key.separator=":" -
Command to receive a message from the
messagestopic:<path_to_folder_with_Apache_Kafka_files>/bin/kafka-console-consumer.sh \ --consumer.config <path_to_configuration_file> \ --bootstrap-server <host_FQDN>:9091 \ --group test-consumer-group \ --topic messages \ --property print.key=true \ --property key.separator=":"
-
Command to send a message to the
messagestopic:echo "key:test message" | <path_to_folder_with_Apache_Kafka_files>\bin\windows\kafka-console-producer.bat ` --producer.config <path_to_configuration_file> ` --bootstrap-server <host_FQDN>:9091 ` --topic messages ` --property parse.key=true ` --property key.separator=":" -
Command to receive a message from the
messagestopic:<path_to_folder_with_Apache_Kafka_files>\bin\windows\kafka-console-consumer.bat ` --consumer.config <path_to_configuration_file> ` --bootstrap-server <host_FQDN>:9091 ` --group test-consumer-group ` --topic messages ` --property print.key=true ` --property key.separator=":"
Run the message receive command
-
Run the message receive command.
-
Wait 10 to 15 seconds and then interrupt the command by pressing Ctrl + C. Make sure the terminal displays the following message:
Processed a total of 0 messagesThis message means the consumer has successfully connected to the topic.
-
Confirm completing the command.
The consumer group named test-consumer-group has now been registered and can be used as a label for metrics.
Create monitoring charts
Use Yandex Monitoringkafka_group_topic_partition_offset, kafka_log_Log_LogStartOffset, and kafka_log_Log_LogEndOffset metrics on the same chart:
- For
kafka_group_topic_partition_offset, specify these labels:service = managed-kafkaname = kafka_group_topic_partition_offsethost = <host_FQDN>topic = messagesgroup = test-consumer-group
- For
kafka_log_Log_LogStartOffset, specify these labels:service = managed-kafkaname = kafka_log_Log_LogStartOffsethost = <host_FQDN>topic = messages
- For
kafka_log_Log_LogEndOffset, specify these labels:service = managed-kafkaname = kafka_log_Log_LogEndOffsethost = <host_FQDN>topic = messages
Warning
In our example, the messages test topic includes only one partition, so you do not need to specify the partition label. If the topic in your solution has multiple partitions, specify partition when plotting the listed metrics.
Note
To monitor message loss, you only need the kafka_group_topic_partition_offset and kafka_log_Log_LogStartOffset metrics; however, the additional kafka_log_Log_LogEndOffset metric will make the chart more informative.
Test sending and receiving a message
-
Run the message send command.
-
After about three minutes, run the message receive command. Make sure the terminal displays
key:test message. -
Press Ctrl + C to interrupt the command for receiving messages from the topic.
Message deletion being disabled, three minutes later the message is still available to consumers.
Enable message deletion
Configure the messages topic as follows:
- Cleanup policy:
Delete - Retention, ms:
60000
Note
After you change the topic settings, the cluster will take some time to update.
Messages will now be automatically deleted 60 seconds after being written to the topic.
Test sending and receiving a message once again
-
Run the message send command.
-
After about three minutes, run the message receive command. Make sure the message has not been received this time.
-
Press Ctrl + C to interrupt the command for receiving messages from the topic.
All messages are deleted 60 seconds after they are written to the topic, leading to message loss for the "slow" consumer group.
Review the monitoring charts
Navigate to Yandex Monitoring
kafka_log_Log_LogStartOffset: Partition's first offset. Increases as messages are getting written to the topic.kafka_log_Log_LogEndOffset: Partition's last offset. Increases as messages get deleted from the topic.kafka_group_topic_partition_offset: Partition's current consumer group offset. Increases as the consumer group is reading messages from the topic.
The chart visualizes the following patterns:
- At the starting point, all the three metrics are at
0. - After the first message is sent,
kafka_log_Log_LogEndOffsetincreases to1. - Three minutes later, as soon as the first message arrives,
kafka_group_topic_partition_offsetalso increases to1. As messages do not get deleted,kafka_log_Log_LogStartOffsetremains equal to0. - One minute after you enable message deletion, the system deletes the message from the topic and
kafka_log_Log_LogStartOffsetgets the value of1. Now, all the three metrics are at1. - After the second message is sent,
kafka_log_Log_LogEndOffsetincreases to2. One minute later, the system deletes the message from the topic andkafka_log_Log_LogStartOffsetalso gets the value of2.kafka_group_topic_partition_offsetstill equals1. - As soon as you run the message receive command for a second time,
kafka_group_topic_partition_offsetalso increases to2, even though the message has not been received.
Summary:
- The
kafka_group_topic_partition_offsetvalue normally lies betweenkafka_log_Log_LogStartOffsetandkafka_log_Log_LogEndOffset.kafka_group_topic_partition_offsetfalling belowkafka_log_Log_LogStartOffsetsignals that this consumer group has lost messages. - The difference between
kafka_log_Log_LogEndOffsetandkafka_group_topic_partition_offsetshows how many of the new messages have not yet been read, i.e., the consumer group's lag behind the producers.
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them: