Code examples for connecting to an Apache Kafka® cluster
You can only connect to public Apache Kafka® cluster hosts using an SSL certificate. The examples below assume that the YandexInternalRootCA.crt certificate is located in this directory:
/usr/local/share/ca-certificates/Yandex/for Ubuntu.$HOME\.kafka\for Windows.
Connecting without an SSL certificate is only supported for non-public hosts. If this is the case, internal virtual network traffic will not be encrypted when connecting to a database.
Before connecting, configure security groups for the cluster, if required.
To see code examples with the host FQDN filled in, open the cluster page in the management console
The examples were tested in the following environment:
- Yandex Cloud VM running Ubuntu 20.04 LTS
- Bash:
5.0.16. - Python:
3.8.2, pip3:20.0.2. - Node.JS:
10.19.0, npm:6.14.4. - OpenJDK:
11.0.8, Maven:3.6.3. - Go:
1.13.8. - mono-complete:
6.8.0.105.
C#
Before connecting:
-
Install the dependencies:
sudo apt-get update && \ sudo apt-get install -y apt-transport-https dotnet-sdk-6.0 -
Create a directory for the project:
cd ~/ && mkdir cs-project && cd cs-project && mkdir -p consumer producer && cd ~/cs-project -
Create a configuration file:
App.csproj<Project Sdk="Microsoft.NET.Sdk"> <PropertyGroup> <OutputType>Exe</OutputType> <TargetFramework>netcoreapp6.0</TargetFramework> </PropertyGroup> <ItemGroup> <PackageReference Include="Confluent.Kafka" Version="2.2.0" /> </ItemGroup> </Project> -
Copy
App.csprojto the producer and consumer application directories:cp App.csproj producer/App.csproj && cp App.csproj consumer/App.csproj
-
Code example for sending messages to a topic:
cs-project/producer/Program.csusing Confluent.Kafka; using System; using System.Collections.Generic; namespace App { class Program { public static void Main(string[] args) { int MSG_COUNT = 5; string HOST = "<broker_host_FQDN>:9092"; string TOPIC = "<topic_name>"; string USER = "<producer_name>"; string PASS = "<producer_password>"; var producerConfig = new ProducerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_PLAINTEXT"}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS} } ); var producer = new ProducerBuilder<string, string>(producerConfig).Build(); for(int i=0; i<MSG_COUNT; i++) { producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); } }); } producer.Flush(TimeSpan.FromSeconds(10)); } } } -
Code example for getting messages from a topic:
cs-project/consumer/Program.csusing Confluent.Kafka; using System; using System.Collections.Generic; namespace CCloud { class Program { public static void Main(string[] args) { string HOST = "<broker_host_FQDN>:9092"; string TOPIC = "<topic_name>"; string USER = "<consumer_name>"; string PASS = "<consumer_password>"; var consumerConfig = new ConsumerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_PLAINTEXT"}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS}, {"group.id", "demo"} } ); var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build(); consumer.Subscribe(TOPIC); try { while (true) { var cr = consumer.Consume(); Console.WriteLine($"{cr.Message.Key}:{cr.Message.Value}"); } } catch (OperationCanceledException) { // Ctrl-C was pressed. } finally { consumer.Close(); } } } } -
Building and launching applications:
cd ~/cs-project/consumer && dotnet build && \ dotnet run bin/Debug/netcoreapp5.0/App.dllcd ~/cs-project/producer && dotnet build && \ dotnet run bin/Debug/netcoreapp5.0/App.dll
-
Code example for sending messages to a topic:
cs-project/producer/Program.csusing Confluent.Kafka; using System; using System.Collections.Generic; namespace App { class Program { public static void Main(string[] args) { int MSG_COUNT = 5; string HOST = "<broker_host_FQDN>:9091"; string TOPIC = "<topic_name>"; string USER = "<producer_name>"; string PASS = "<producer_password>"; string CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; var producerConfig = new ProducerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_SSL"}, {"ssl.ca.location", CA_FILE}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS} } ); var producer = new ProducerBuilder<string, string>(producerConfig).Build(); for(int i=0; i<MSG_COUNT; i++) { producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); } }); } producer.Flush(TimeSpan.FromSeconds(10)); } } } -
Code example for getting messages from a topic:
cs-project/consumer/Program.csusing Confluent.Kafka; using System; using System.Collections.Generic; namespace CCloud { class Program { public static void Main(string[] args) { string HOST = "<broker_host_FQDN>:9091"; string TOPIC = "<topic_name>"; string USER = "<consumer_name>"; string PASS = "<consumer_password>"; string CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; var consumerConfig = new ConsumerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_SSL"}, {"ssl.ca.location", CA_FILE}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS}, {"group.id", "demo"} } ); var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build(); consumer.Subscribe(TOPIC); try { while (true) { var cr = consumer.Consume(); Console.WriteLine($"{cr.Message.Key}:{cr.Message.Value}"); } } catch (OperationCanceledException) { // Ctrl-C was pressed. } finally { consumer.Close(); } } } } -
Building and running applications:
cd ~/cs-project/consumer && dotnet build && \ dotnet run bin/Debug/netcoreapp6.0/App.dllcd ~/cs-project/producer && dotnet build && \ dotnet run bin/Debug/netcoreapp6.0/App.dll
To learn how to get a broker host FQDN, see this guide.
First, start the consumer application to continuously read new messages from the topic. Then, start the producer application to send key:test message to the topic one or more times. The consumer application will displays messages sent to the topic.
Go
Before connecting:
-
Install the dependencies:
sudo apt update && sudo apt install -y golang git && \ go mod init example && \ go get github.com/Shopify/sarama@v1.38.1 && \ go get github.com/xdg-go/scram@v1.1.2 -
Create a directory for the project:
cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer -
Create the
scram.gofile with the SCRAM code, which is the same for the producer and consumer applications:scram.go
package main import ( "crypto/sha256" "crypto/sha512" "hash" "github.com/xdg-go/scram" ) var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn } func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) if err != nil { return err } x.ClientConversation = x.Client.NewConversation() return nil } func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { response, err = x.ClientConversation.Step(challenge) return } func (x *XDGSCRAMClient) Done() bool { return x.ClientConversation.Done() } -
Copy
scram.goto the producer and consumer application directories:cp scram.go producer/scram.go && cp scram.go consumer/scram.go
-
Code example for sending a message to a topic:
producer/main.gopackage main import ( "fmt" "os" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<broker_host_FQDN>:9092" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Producer.Return.Successes = true conf.Version = sarama.V0_10_0_0 conf.ClientID = "sasl_scram_client" conf.Net.SASL.Enable = true conf.Net.SASL.Handshake = true conf.Net.SASL.User = "<producer_name>" conf.Net.SASL.Password = "<producer_password>" conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf) if err != nil { fmt.Println("Couldn't create producer: ", err.Error()) os.Exit(0) } publish("test message", syncProducer) } func publish(message string, producer sarama.SyncProducer) { // Publish sync msg := &sarama.ProducerMessage { Topic: "<topic_name>", Value: sarama.StringEncoder(message), } p, o, err := producer.SendMessage(msg) if err != nil { fmt.Println("Error publish: ", err.Error()) } fmt.Println("Partition: ", p) fmt.Println("Offset: ", o) } -
Code example for getting messages from a topic:
consumer/main.gopackage main import ( "fmt" "os" "os/signal" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<broker_host_FQDN>:9092" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Version = sarama.V0_10_0_0 conf.Consumer.Return.Errors = true conf.ClientID = "sasl_scram_client" conf.Metadata.Full = true conf.Net.SASL.Enable = true conf.Net.SASL.User = "<consumer_name>" conf.Net.SASL.Password = "<consumer_password>" conf.Net.SASL.Handshake = true conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) master, err := sarama.NewConsumer(splitBrokers, conf) if err != nil { fmt.Println("Coulnd't create consumer: ", err.Error()) os.Exit(1) } defer func() { if err := master.Close(); err != nil { panic(err) } }() topic := "<topic_name>" consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest) if err != nil { panic(err) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // Count the number of processed messages msgCount := 0 // Get signal to finish doneCh := make(chan struct{}) go func() { for { select { case err := <-consumer.Errors(): fmt.Println(err) case msg := <-consumer.Messages(): msgCount++ fmt.Println("Received messages", string(msg.Key), string(msg.Value)) case <-signals: fmt.Println("Interrupt is detected") doneCh <- struct{}{} } } }() <-doneCh fmt.Println("Processed", msgCount, "messages") } -
Building applications:
cd ~/go-project/producer && go build && \ cd ~/go-project/consumer && go build -
Running applications:
~/go-project/consumer/consumer~/go-project/producer/producer
-
Code example for sending a message to a topic:
producer/main.gopackage main import ( "fmt" "crypto/tls" "crypto/x509" "io/ioutil" "os" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<broker_host_FQDN>:9091" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Producer.Return.Successes = true conf.Version = sarama.V0_10_0_0 conf.ClientID = "sasl_scram_client" conf.Net.SASL.Enable = true conf.Net.SASL.Handshake = true conf.Net.SASL.User = "<producer_name>" conf.Net.SASL.Password = "<producer_password>" conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) certs := x509.NewCertPool() pemPath := "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt" pemData, err := ioutil.ReadFile(pemPath) if err != nil { fmt.Println("Couldn't load cert: ", err.Error()) // Handle the error } certs.AppendCertsFromPEM(pemData) conf.Net.TLS.Enable = true conf.Net.TLS.Config = &tls.Config{ RootCAs: certs, } syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf) if err != nil { fmt.Println("Couldn't create producer: ", err.Error()) os.Exit(0) } publish("test message", syncProducer) } func publish(message string, producer sarama.SyncProducer) { // publish sync msg := &sarama.ProducerMessage { Topic: "<topic_name>", Value: sarama.StringEncoder(message), } p, o, err := producer.SendMessage(msg) if err != nil { fmt.Println("Error publish: ", err.Error()) } fmt.Println("Partition: ", p) fmt.Println("Offset: ", o) } -
Code example for getting messages from a topic:
consumer/main.gopackage main import ( "fmt" "crypto/tls" "crypto/x509" "io/ioutil" "os" "os/signal" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<broker_host_FQDN>:9091" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Version = sarama.V0_10_0_0 conf.Consumer.Return.Errors = true conf.ClientID = "sasl_scram_client" conf.Metadata.Full = true conf.Net.SASL.Enable = true conf.Net.SASL.User = "<consumer_name>" conf.Net.SASL.Password = "<consumer_password>" conf.Net.SASL.Handshake = true conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) certs := x509.NewCertPool() pemPath := "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt" pemData, err := ioutil.ReadFile(pemPath) if err != nil { fmt.Println("Couldn't load cert: ", err.Error()) // Handle the error } certs.AppendCertsFromPEM(pemData) conf.Net.TLS.Enable = true conf.Net.TLS.Config = &tls.Config{ RootCAs: certs, } master, err := sarama.NewConsumer(splitBrokers, conf) if err != nil { fmt.Println("Coulnd't create consumer: ", err.Error()) os.Exit(1) } defer func() { if err := master.Close(); err != nil { panic(err) } }() topic := "<topic_name>" consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest) if err != nil { panic(err) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // Count the number of processed messages msgCount := 0 // Get signal to finish doneCh := make(chan struct{}) go func() { for { select { case err := <-consumer.Errors(): fmt.Println(err) case msg := <-consumer.Messages(): msgCount++ fmt.Println("Received messages", string(msg.Key), string(msg.Value)) case <-signals: fmt.Println("Interrupt is detected") doneCh <- struct{}{} } } }() <-doneCh fmt.Println("Processed", msgCount, "messages") } -
Building applications:
cd ~/go-project/producer && go build && \ cd ~/go-project/consumer && go build -
Running applications:
~/go-project/consumer/consumer~/go-project/producer/producer
To learn how to get a broker host FQDN, see this guide.
First, start the consumer application to continuously read new messages from the topic. Then, start the producer application to send key:test message to the topic one or more times. The consumer application will displays messages sent to the topic.
Java
Before connecting:
-
Install the dependencies:
sudo apt update && sudo apt install --yes default-jdk maven -
Create a directory for the Maven project:
cd ~/ && \ mkdir --parents project/consumer/src/java/com/example project/producer/src/java/com/example && \ cd ~/project -
Create a configuration file for Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>app</artifactId> <packaging>jar</packaging> <version>0.1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version> </dependency> </dependencies> <build> <finalName>${project.artifactId}-${project.version}</finalName> <sourceDirectory>src</sourceDirectory> <resources> <resource> <directory>src</directory> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <goals> <goal>attached</goal> </goals> <phase>package</phase> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.example.App</mainClass> </manifest> </archive> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.1.0</version> <configuration> <archive> <manifest> <mainClass>com.example.App</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>For current versions of the dependencies, refer to the relevant project pages in the Maven repository:
-
Copy
pom.xmlto the producer and consumer application directories:cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
-
Code example for sending messages to a topic:
producer/src/java/com/example/App.javapackage com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.clients.producer.*; public class App { public static void main(String[] args) { int MSG_COUNT = 5; String HOST = "<broker_FQDN>:9092"; String TOPIC = "<topic_name>"; String USER = "<producer_name>"; String PASS = "<producer_password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String KEY = "key"; String serializer = StringSerializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put("acks", "all"); props.put("key.serializer", serializer); props.put("value.serializer", serializer); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); Producer<String, String> producer = new KafkaProducer<>(props); try { for (int i = 1; i <= MSG_COUNT; i++){ producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get(); System.out.println("Test message " + i); } producer.flush(); producer.close(); } catch (Exception ex) { System.out.println(ex); producer.close(); } } } -
Code example for getting messages from a topic:
consumer/src/java/com/example/App.javapackage com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.clients.consumer.*; public class App { public static void main(String[] args) { String HOST = "<broker_FQDN>:9092"; String TOPIC = "<topic_name>"; String USER = "<consumer_name>"; String PASS = "<consumer_password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String GROUP = "demo"; String deserializer = StringDeserializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("group.id", GROUP); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(new String[] {TOPIC})); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + ":" + record.value()); } } } } -
Building applications:
cd ~/project/producer && mvn clean package && \ cd ~/project/consumer && mvn clean package -
Running applications:
java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jarjava -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
-
Go to the folder where the Java certificate store will reside:
cd /etc/security -
Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set a password of at least 6 characters using the
-storepassparameter for additional storage protection:sudo keytool -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore ssl -storepass <certificate_store_password> \ --noprompt -
Code example for sending messages to a topic:
producer/src/java/com/example/App.javapackage com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.clients.producer.*; public class App { public static void main(String[] args) { int MSG_COUNT = 5; String HOST = "<broker_FQDN>:9091"; String TOPIC = "<topic_name>"; String USER = "<producer_name>"; String PASS = "<producer_password>"; String TS_FILE = "/etc/security/ssl"; String TS_PASS = "<certificate_storage_password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String KEY = "key"; String serializer = StringSerializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put("acks", "all"); props.put("key.serializer", serializer); props.put("value.serializer", serializer); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); props.put("ssl.truststore.location", TS_FILE); props.put("ssl.truststore.password", TS_PASS); Producer<String, String> producer = new KafkaProducer<>(props); try { for (int i = 1; i <= MSG_COUNT; i++){ producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get(); System.out.println("Test message " + i); } producer.flush(); producer.close(); } catch (Exception ex) { System.out.println(ex); producer.close(); } } } -
Code example for getting messages from a topic:
consumer/src/java/com/example/App.javapackage com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.clients.consumer.*; public class App { public static void main(String[] args) { String HOST = "<broker_FQDN>:9091"; String TOPIC = "<topic_name>"; String USER = "<consumer_name>"; String PASS = "<consumer_password>"; String TS_FILE = "/etc/security/ssl"; String TS_PASS = "<certificate_storage_password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String GROUP = "demo"; String deserializer = StringDeserializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("group.id", GROUP); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); props.put("ssl.truststore.location", TS_FILE); props.put("ssl.truststore.password", TS_PASS); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(new String[] {TOPIC})); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + ":" + record.value()); } } } } -
Building applications:
cd ~/project/producer && mvn clean package && \ cd ~/project/consumer && mvn clean package -
Running applications:
java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jarjava -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
To learn how to get a broker host FQDN, see this guide.
First, start the consumer application to continuously read new messages from the topic. Then, start the producer application to send key:test message to the topic one or more times. The consumer application will displays messages sent to the topic.
Node.js
Before connecting, install the required dependencies:
sudo apt update && sudo apt install -y nodejs npm && \
npm install node-rdkafka
-
Code example for sending messages to a topic:
producer.js"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<broker_FQDN>:9092"; const TOPIC = "<topic_name>"; const USER = "<producer_name>"; const PASS = "<producer_password>"; const producer = new Kafka.Producer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_PLAINTEXT", 'sasl.mechanism': "SCRAM-SHA-512" }); producer.connect(); producer.on('ready', function() { try { for (let i = 0; i < MSG_COUNT; ++i) { producer.produce(TOPIC, -1, Buffer.from("test message"), "key"); console.log("Produced: test message"); } producer.flush(10000, () => { producer.disconnect(); }); } catch (err) { console.error('Error'); console.error(err); } }); -
Code example for getting messages from a topic:
consumer.js"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<broker_FQDN>:9092"; const TOPIC = "<topic_name>"; const USER = "<consumer_name>"; const PASS = "<consumer_password>"; const consumer = new Kafka.Consumer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_PLAINTEXT", 'sasl.mechanism': "SCRAM-SHA-512", 'group.id': "demo" }); consumer.connect(); consumer .on('ready', function() { consumer.subscribe([TOPIC]); consumer.consume(); }) .on('data', function(data) { console.log(data.key + ":" + data.value.toString()); }); process.on('SIGINT', () => { console.log('\nDisconnecting consumer ...'); consumer.disconnect(); }); -
Running applications:
node consumer.jsnode producer.js
-
Code example for sending messages to a topic:
producer.js"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<broker_FQDN>:9091"; const TOPIC = "<topic_name>"; const USER = "<producer_name>"; const PASS = "<producer_password>"; const CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; const producer = new Kafka.Producer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_SSL", 'ssl.ca.location': CA_FILE, 'sasl.mechanism': "SCRAM-SHA-512" }); producer.connect(); producer.on('ready', function() { try { for (let i = 0; i < MSG_COUNT; ++i) { producer.produce(TOPIC, -1, Buffer.from("test message"), "key"); console.log("Produced: test message"); } producer.flush(10000, () => { producer.disconnect(); }); } catch (err) { console.error('Error'); console.error(err); } }); -
Code example for getting messages from a topic:
consumer.js"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<broker_FQDN>:9091"; const TOPIC = "<topic_name>"; const USER = "<consumer_name>"; const PASS = "<consumer_password>"; const CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; const consumer = new Kafka.Consumer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_SSL", 'ssl.ca.location': CA_FILE, 'sasl.mechanism': "SCRAM-SHA-512", 'group.id': "demo" }); consumer.connect(); consumer .on('ready', function() { consumer.subscribe([TOPIC]); consumer.consume(); }) .on('data', function(data) { console.log(data.key + ":" + data.value.toString()); }); process.on('SIGINT', () => { console.log('\nDisconnecting consumer ...'); consumer.disconnect(); }); -
Running applications:
node consumer.jsnode producer.js
To learn how to get a broker host FQDN, see this guide.
First, start the consumer application to continuously read new messages from the topic. Then, start the producer application to send key:test message to the topic one or more times. The consumer application will displays messages sent to the topic.
Python (kafka-python)
Before connecting, install the required dependencies:
sudo apt update && sudo apt install -y python3 python3-pip libsnappy-dev && \
pip3 install kafka-python lz4 python-snappy crc32c
-
Code example for sending a message to a topic:
producer.pyfrom kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='<broker_host_FQDN>:9092', security_protocol="SASL_PLAINTEXT", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<producer_name>', sasl_plain_password='<producer_password>') producer.send('<topic_name>', b'test message', b'key') producer.flush() producer.close() -
Code example for getting messages from a topic:
consumer.pyfrom kafka import KafkaConsumer consumer = KafkaConsumer( '<topic_name>', bootstrap_servers='<broker_FQDN>:9092', security_protocol="SASL_PLAINTEXT", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<consumer_name>', sasl_plain_password='<consumer_password>') print("ready") for msg in consumer: print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8")) -
Running applications:
python3 producer.pypython3 consumer.py
-
Code example for sending a message to a topic:
producer.pyfrom kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='<broker_host_FQDN>:9091', security_protocol="SASL_SSL", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<producer_name>', sasl_plain_password='<producer_password>', ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt") producer.send('<topic_name>', b'test message', b'key') producer.flush() producer.close() -
Code example for getting messages from a topic:
consumer.pyfrom kafka import KafkaConsumer consumer = KafkaConsumer( '<topic_name>', bootstrap_servers='<broker_FQDN>:9091', security_protocol="SASL_SSL", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<consumer_name>', sasl_plain_password='<consumer_password>', ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt") print("ready") for msg in consumer: print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8")) -
Running applications:
python3 consumer.pypython3 producer.py
To learn how to get a broker host FQDN, see this guide.
First, start the consumer application to continuously read new messages from the topic. Then, start the producer application to send key:test message to the topic one or more times. The consumer application will displays messages sent to the topic.
Python (confluent-kafka)
Before connecting, install the required dependencies:
pip install confluent_kafka
-
Code example for sending a message to a topic:
producer.pyfrom confluent_kafka import Producer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<broker_host_FQDN>:9092', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<producer_name>', 'sasl.password': '<producer_password>', 'error_cb': error_callback, } p = Producer(params) p.produce('<topic_name>', 'some payload1') p.flush(10) -
Code example for getting messages from a topic:
consumer.pyfrom confluent_kafka import Consumer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<broker_host_FQDN>:9092', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<consumer_name>', 'sasl.password': '<consumer_password>', 'group.id': 'test-consumer1', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'error_cb': error_callback, 'debug': 'all', } c = Consumer(params) c.subscribe(['<topic_name>']) while True: msg = c.poll(timeout=3.0) if msg: val = msg.value().decode() print(val) -
Running applications:
python3 producer.pypython3 consumer.py
-
Code example for sending a message to a topic:
producer.pyfrom confluent_kafka import Producer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<broker_host_FQDN>:9091', 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<producer_name>', 'sasl.password': '<producer_password>', 'error_cb': error_callback, } p = Producer(params) p.produce('<topic_name>', 'some payload1') p.flush(10) -
Code example for getting messages from a topic:
consumer.pyfrom confluent_kafka import Consumer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<broker_host_FQDN>:9091', 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<consumer_name>', 'sasl.password': '<consumer_password>', 'group.id': 'test-consumer1', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'error_cb': error_callback, 'debug': 'all', } c = Consumer(params) c.subscribe(['<topic_name>']) while True: msg = c.poll(timeout=3.0) if msg: val = msg.value().decode() print(val) -
Running applications:
python3 consumer.pypython3 producer.py
To learn how to get a broker host FQDN, see this guide.
First, start the consumer application to continuously read new messages from the topic. Then, start the producer application to send key:test message to the topic one or more times. The consumer application will displays messages sent to the topic.