Connecting to an Apache Kafka® cluster from applications
This section provides settings for connecting to Managed Service for Apache Kafka® cluster hosts using the command line tools and Docker container. To learn how to connect from your application code, see Code examples.
You can connect to public Apache Kafka® cluster hosts only if you use an SSL certificate. The examples below assume that the YandexInternalRootCA.crt
certificate is located in the directory:
/usr/local/share/ca-certificates/Yandex/
for Ubuntu.$HOME\.kafka\
for Windows.
Connecting without an SSL certificate is only supported for non-public hosts. For connections to the database, traffic inside the virtual network is not encrypted in this case.
Before connecting, configure security groups for the cluster, if required.
The examples for Linux were tested in the following environment:
- Yandex Cloud virtual machine running Ubuntu 20.04 LTS
- Bash:
5.0.16
The examples for Windows were tested in the following environment:
- Yandex Cloud virtual machine running Windows Server 2019 Datacenter.
- Microsoft OpenJDK:
11.0.11
. - PowerShell:
5.1.17763.1490 Desktop
.
Command line tools
To see code examples with the host FQDN filled in, open the cluster page in the management console
Linux (Bash)/macOS (Zsh)
To connect to an Apache Kafka® cluster from the command line, use kafkacat
, an open source application that can work as a universal data producer or consumer. For more information, see the documentation
Before connecting, install the dependencies:
sudo apt update && sudo apt install -y kafkacat
-
Run this command for receiving messages from a topic:
kafkacat -C \ -b <broker_FQDN>:9092 \ -t <topic_name> \ -X security.protocol=SASL_PLAINTEXT \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<consumer_username>" \ -X sasl.password="<consumer_password>" -Z
The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "test message" | kafkacat -P \ -b <broker_FQDN>:9092 \ -t <topic_name> \ -k key \ -X security.protocol=SASL_PLAINTEXT \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<consumer_username>" \ -X sasl.password="<producer_username>" -Z
-
Run this command for receiving messages from a topic:
kafkacat -C \ -b <broker_FQDN>:9091 \ -t <topic_name> \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<consumer_username>" \ -X sasl.password="<consumer_password>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "test message" | kafkacat -P \ -b <broker_FQDN>:9091 \ -t <topic_name> \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=SCRAM-SHA-512 \ -X sasl.username="<producer_login>" \ -X sasl.password="<producer_password>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
For info on how to get a broker host's FQDN, see this guide.
Make sure that the first terminal displays the message key:test message
sent in the second terminal.
Windows (PowerShell)
Before connecting:
-
Install the latest available version of Microsoft OpenJDK
. -
Download the archive with binary files
for the Apache Kafka® version run by the cluster. Your Scala version is irrelevant. -
Unpack the archive.
Tip
Unpack the Apache Kafka® files to the root directory of the disk, for example,
C:\kafka_2.12-2.6.0\
.If the path to the Apache Kafka® executable and batch files is too long, you will get the error saying
The input line is too long
when trying to run these files.
-
Run this command for receiving messages from a topic:
<path_to_the_directory_with_Apache_Kafka_files>\bin\windows\kafka-console-consumer.bat ` --bootstrap-server <broker_FQDN>:9092 ` --topic <topic_name> ` --property print.key=true ` --property key.separator=":" ` --consumer-property security.protocol=SASL_PLAINTEXT ` --consumer-property sasl.mechanism=SCRAM-SHA-512 ` --consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<consumer_username>' password='<consumer_password>';"
The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "key:test message" | <path_to_the_directory_with_Apache_Kafka_files>\bin\windows\kafka-console-producer.bat ` --bootstrap-server <broker_FQDN>:9092 ` --topic <topic_name> ` --property parse.key=true ` --property key.separator=":" ` --producer-property acks=all ` --producer-property security.protocol=SASL_PLAINTEXT ` --producer-property sasl.mechanism=SCRAM-SHA-512 ` --producer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<producer_login>' password='<producer_password>';"
-
Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set the password using the
-storepass
parameter for additional storage protection:keytool.exe -importcert -alias YandexCA ` --file $HOME\.kafka\YandexInternalRootCA.crt ` --keystore $HOME\.kafka\ssl ` --storepass <certificate_store_password> ` --noprompt
-
Run this command for receiving messages from a topic:
<path_to_the_directory_with_Apache_Kafka_files>\bin\windows\kafka-console-consumer.bat ` --bootstrap-server <broker_FQDN>:9091 ` --topic <topic_name> ` --property print.key=true ` --property key.separator=":" ` --consumer-property security.protocol=SASL_SSL ` --consumer-property sasl.mechanism=SCRAM-SHA-512 ` --consumer-property ssl.truststore.location=$HOME\.kafka\ssl ` --consumer-property ssl.truststore.password=<certificate_store_password> ` --consumer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<consumer_username>' password='<consumer_password>';"
The command will continuously read new messages from the topic.
-
In a separate terminal, run the command for sending a message to a topic:
echo "key:test message" | <path_to_the_directory_with_Apache_Kafka_files>\bin\windows\kafka-console-producer.bat ` --bootstrap-server <broker_FQDN>:9091 ` --topic <topic_name> ` --property parse.key=true ` --property key.separator=":" ` --producer-property acks=all ` --producer-property security.protocol=SASL_SSL ` --producer-property sasl.mechanism=SCRAM-SHA-512 ` --producer-property ssl.truststore.location=$HOME\.kafka\ssl ` --producer-property ssl.truststore.password=<certificate_store_password> ` --producer-property sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username='<producer_password>' password='<producer_password>';"
For info on how to get a broker host's FQDN, see this guide.
Make sure that the first terminal displays the message key:test message
sent in the second terminal.
Before you connect from a Docker container
To connect to a Managed Service for Apache Kafka® cluster from a Docker container, add the following lines to the Dockerfile:
RUN apt-get update && \
apt-get install kafkacat --yes
RUN apt-get update && \
apt-get install wget kafkacat --yes && \
mkdir --parents /usr/local/share/ca-certificates/Yandex/ && \
wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \
--output-document /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt && \
chmod 0655 /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt