Connecting to a Apache Kafka® cluster from applications
This section provides settings for connection to Managed Service for Apache Kafka® cluster hosts using command line tools and from a Docker container. To learn how to connect from your application code, see Code examples.
You can connect to public Apache Kafka® cluster hosts only if using an SSL certificate. The examples below assume that the YandexInternalRootCA.crt certificate is located in this directory:
/usr/local/share/ca-certificates/Yandex/for Ubuntu.$HOME\.kafka\for Windows.
Connections without an SSL certificate are only supported for hosts that are not publicly accessible. If this is the case, internal virtual network traffic will not be encrypted for database connections.
Before connecting, configure security groups for the cluster, if required.
The examples for Linux were tested in the following environment:
- Yandex Cloud virtual machine running Ubuntu 20.04 LTS
- OpenJDK:
11.0.24 - Bash:
5.0.16
The examples for Windows were tested in the following environment:
- Yandex Cloud virtual machine running Windows Server 2019 Datacenter.
- Microsoft OpenJDK:
11.0.11. - PowerShell:
5.1.17763.1490 Desktop.
Command line tools
To see code examples with the host FQDN filled in, open the cluster page in the management console
kafkacat
The kafkacatkcat, utility is an open source app that can function as a universal data producer or consumer without installing Java Runtime Environment.
Before connecting, install the required dependencies:
sudo apt update && sudo apt install -y kafkacat
Note
On Ubuntu 24.04 and higher, use kcat.
-
Run this command for receiving messages from a topic:
kafkacat -C \ -b <broker_FQDN>:9092 \ -t <topic_name> \ -X security.protocol=SASL_PLAINTEXT \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<consumer_login>" \ -X sasl.password="<consumer_password>" -ZThe command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "test message" | kafkacat -P \ -b <broker_FQDN>:9092 \ -t <topic_name> \ -k key \ -X security.protocol=SASL_PLAINTEXT \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<producer_login>" \ -X sasl.password="<producer_password>" -Z
-
Run this command for receiving messages from a topic:
kafkacat -C \ -b <broker_FQDN>:9091 \ -t <topic_name> \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<consumer_username>" \ -X sasl.password="<consumer_password>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "test message" | kafkacat -P \ -b <broker_FQDN>:9091 \ -t <topic_name> \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<producer_login>" \ -X sasl.password="<producer_password>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
For info on how to get a broker host's FQDN, see this guide.
Make sure that the first terminal displays the message key:test message sent in the second terminal.
Apache Kafka® tools for Linux (Bash)/macOS (Zsh)
Archives with Apache Kafka®
- A message will be sent to the topic using kafka-console-producer
. - A message will be received from the topic using kafka-console-consumer
.
Before connecting:
-
Install OpenJDK:
sudo apt update && sudo apt install --yes default-jdk -
Download the archive with binary files
for the Apache Kafka® version run by the cluster. Your Scala version is irrelevant. -
Unpack the archive.
-
Create files with cluster connection parameters: a file for the producer and a file for the consumer.
The files have the same content, only the user details are different:
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="<producer_or_consumer_login>" \ password="<producer_or_consumer_password>"; security.protocol=SASL_PLAINTEXT -
Run this command for receiving messages from a topic:
<path_to_folder_with_Apache_Kafka_files>/bin/kafka-console-consumer.sh \ --consumer.config <path_to_file_with_parameters_for_consumer> \ --bootstrap-server <broker_FQDN>:9092 \ --topic <topic_name> \ --property print.key=true \ --property key.separator=":"The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "key:test message" | <path_to_folder_with_Apache_Kafka_files>/bin/kafka-console-producer.sh \ --producer.config <path_to_file_with_parameters_for_producer> \ --bootstrap-server <broker_FQDN>:9092 \ --topic <topic_name> \ --property parse.key=true \ --property key.separator=":"
-
Go to the folder where the Java certificate store will be located:
cd /etc/security -
Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set a password of at least 6 characters using the
-storepassparameter for additional storage protection:sudo keytool -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore ssl -storepass <certificate_store_password> \ --noprompt -
Create files with cluster connection parameters: a file for the producer and a file for the consumer.
The files have the same content, only the user details are different:
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="<producer_or_consumer_login>" \ password="<producer_or_consumer_password>"; security.protocol=SASL_SSL ssl.truststore.location=/etc/security/ssl ssl.truststore.password=<certificate_store_password> -
Run this command for receiving messages from a topic:
<path_to_folder_with_Apache_Kafka_files>/bin/kafka-console-consumer.sh \ --consumer.config <path_to_file_with_parameters_for_consumer> \ --bootstrap-server <broker_FQDN>:9091 \ --topic <topic_name> \ --property print.key=true \ --property key.separator=":"The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "key:test message" | <path_to_folder_with_Apache_Kafka_files>/bin/kafka-console-producer.sh \ --producer.config <path_to_file_with_parameters_for_producer> \ --bootstrap-server <broker_FQDN>:9091 \ --topic <topic_name> \ --property parse.key=true \ --property key.separator=":"
For info on how to get a broker host's FQDN, see this guide.
Make sure that the first terminal displays the message key:test message sent in the second terminal.
Apache Kafka® tools for Windows (PowerShell)
Archives with Apache Kafka®
- A message will be sent to the topic using kafka-console-producer
. - A message will be received from the topic using kafka-console-consumer
.
While mentioning .sh scripts, the documentation for the tools is relevant for Windows as well. The tools are the same whichever the platform, only the scripts that run them are different, for example:
bin/kafka-console-producer.shfor Linux (Bash)/macOS (Zsh).bin\windows\kafka-console-producer.batfor Windows (PowerShell).
Before connecting:
-
Install the latest available version of Microsoft OpenJDK
. -
Download the archive with binary files
for the Apache Kafka® version run by the cluster. Your Scala version is irrelevant. -
Unpack the archive.
Tip
Unpack the Apache Kafka® files to the disk's root folder, e.g.,
C:\kafka_2.12-2.6.0\.If the path to the Apache Kafka® executables and batch files is too long, you will get the
The input line is too longerror when trying to run them.
-
Create files with cluster connection parameters: a file for the producer and a file for the consumer.
The files have the same content, only the user details are different:
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="<producer_or_consumer_login>" \ password="<producer_or_consumer_password>"; security.protocol=SASL_PLAINTEXT -
Run this command for receiving messages from a topic:
<path_to_folder_with_Apache_Kafka_files>\bin\windows\kafka-console-consumer.bat ` --consumer.config <path_to_file_with_parameters_for_consumer> ` --bootstrap-server <broker_FQDN>:9092 ` --topic <topic_name> ` --property print.key=true ` --property key.separator=":"The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "key:test message" | <path_to_folder_with_Apache_Kafka_files>\bin\windows\kafka-console-producer.bat ` --producer.config <path_to_file_with_parameters_for_producer> ` --bootstrap-server <broker_FQDN>:9092 ` --topic <topic_name> ` --property parse.key=true ` --property key.separator=":"
-
Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set a password in the
--storepassparameter for additional storage protection:keytool.exe -importcert -alias YandexCA ` --file $HOME\.kafka\YandexInternalRootCA.crt ` --keystore $HOME\.kafka\ssl ` --storepass <certificate_store_password> ` --noprompt -
Create files with cluster connection parameters: a file for the producer and a file for the consumer.
The files have the same content, only the user details are different:
sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="<producer_or_consumer_login>" \ password="<producer_or_consumer_password>"; security.protocol=SASL_SSL ssl.truststore.location=<$HOME_variable_value>\\.kafka\\ssl ssl.truststore.password=<certificate_store_password>Specify the full path to the certificate store as the
ssl.truststore.locationparameter value, for example:ssl.truststore.location=C:\\Users\\Administrator\\.kafka\\sslThe certificate store is located at
$HOME\.kafka\ssl, but you cannot use environment variables in the value. To expand the variable, run this command:echo $HOMEWarning
Use
\\instead of\when specifying thessl.truststore.locationparameter value, otherwise you will not be able to access the certificate store when running commands. -
Run this command for receiving messages from a topic:
<path_to_folder_with_Apache_Kafka_files>\bin\windows\kafka-console-consumer.bat ` --consumer.config <path_to_file_with_parameters_for_consumer> ` --bootstrap-server <broker_FQDN>:9091 ` --topic <topic_name> ` --property print.key=true ` --property key.separator=":"The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "key:test message" | <path_to_folder_with_Apache_Kafka_files>\bin\windows\kafka-console-producer.bat ` --producer.config <path_to_file_with_parameters_for_producer> ` --bootstrap-server <broker_FQDN>:9091 ` --topic <topic_name> ` --property parse.key=true ` --property key.separator=":"
For info on how to get a broker host's FQDN, see this guide.
Make sure that the first terminal displays the message key:test message sent in the second terminal.
Before you connect from a Docker container
To connect to a Managed Service for Apache Kafka® cluster from a Docker container, add the following lines to the Dockerfile:
RUN apt-get update && \
apt-get install kafkacat --yes
RUN apt-get update && \
apt-get install wget kafkacat --yes && \
mkdir --parents /usr/local/share/ca-certificates/Yandex/ && \
wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \
--output-document /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt && \
chmod 0655 /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt