Code examples for connecting to an Apache Kafka® cluster
You can connect to public Apache Kafka® cluster hosts only if using an SSL certificate. The examples below assume that the YandexInternalRootCA.crt certificate is located in this directory:
/usr/local/share/ca-certificates/Yandex/for Ubuntu.$HOME\.kafka\for Windows.
Connections without an SSL certificate are only supported for hosts that are not publicly accessible. If this is the case, internal virtual network traffic will not be encrypted for database connections.
Before connecting, configure security groups for the cluster, if required.
To see code examples with the host FQDN filled in, open the cluster page in the management console
The examples were tested in the following environment:
- Yandex Cloud virtual machine running Ubuntu 20.04 LTS.
- Bash:
5.0.16. - Python:
3.8.2, pip3:20.0.2. - Node.JS:
10.19.0, npm:6.14.4. - OpenJDK:
11.0.8, Maven:3.6.3. - Go:
1.13.8. - mono-complete:
6.8.0.105.
C#
Before connecting:
-
Install the dependencies:
sudo apt-get update && \ sudo apt-get install -y apt-transport-https dotnet-sdk-6.0 -
Create a directory for the project:
cd ~/ && mkdir cs-project && cd cs-project && mkdir -p consumer producer && cd ~/cs-project -
Create a configuration file:
App.csproj<Project Sdk="Microsoft.NET.Sdk"> <PropertyGroup> <OutputType>Exe</OutputType> <TargetFramework>netcoreapp6.0</TargetFramework> </PropertyGroup> <ItemGroup> <PackageReference Include="Confluent.Kafka" Version="2.2.0" /> </ItemGroup> </Project> -
Copy
App.csprojto the producer and consumer application directories:cp App.csproj producer/App.csproj && cp App.csproj consumer/App.csproj
-
Code example for delivering messages to a topic:
cs-project/producer/Program.csusing Confluent.Kafka; using System; using System.Collections.Generic; namespace App { class Program { public static void Main(string[] args) { int MSG_COUNT = 5; string HOST = "<broker_host_FQDN>:9092"; string TOPIC = "<topic_name>"; string USER = "<producer_name>"; string PASS = "<producer_password>"; var producerConfig = new ProducerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_PLAINTEXT"}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS} } ); var producer = new ProducerBuilder<string, string>(producerConfig).Build(); for(int i=0; i<MSG_COUNT; i++) { producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); } }); } producer.Flush(TimeSpan.FromSeconds(10)); } } } -
Code example for getting messages from a topic:
cs-project/consumer/Program.csusing Confluent.Kafka; using System; using System.Collections.Generic; namespace CCloud { class Program { public static void Main(string[] args) { string HOST = "<broker_host_FQDN>:9092"; string TOPIC = "<topic_name>"; string USER = "<consumer_name>"; string PASS = "<consumer_password>"; var consumerConfig = new ConsumerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_PLAINTEXT"}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS}, {"group.id", "demo"} } ); var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build(); consumer.Subscribe(TOPIC); try { while (true) { var cr = consumer.Consume(); Console.WriteLine($"{cr.Message.Key}:{cr.Message.Value}"); } } catch (OperationCanceledException) { // Ctrl-C was pressed. } finally { consumer.Close(); } } } } -
Building and launching applications:
cd ~/cs-project/consumer && dotnet build && \ dotnet run bin/Debug/netcoreapp5.0/App.dllcd ~/cs-project/producer && dotnet build && \ dotnet run bin/Debug/netcoreapp5.0/App.dll
-
Code example for delivering messages to a topic:
cs-project/producer/Program.csusing Confluent.Kafka; using System; using System.Collections.Generic; namespace App { class Program { public static void Main(string[] args) { int MSG_COUNT = 5; string HOST = "<broker_host_FQDN>:9091"; string TOPIC = "<topic_name>"; string USER = "<producer_name>"; string PASS = "<producer_password>"; string CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; var producerConfig = new ProducerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_SSL"}, {"ssl.ca.location", CA_FILE}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS} } ); var producer = new ProducerBuilder<string, string>(producerConfig).Build(); for(int i=0; i<MSG_COUNT; i++) { producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); } }); } producer.Flush(TimeSpan.FromSeconds(10)); } } } -
Code example for getting messages from a topic:
cs-project/consumer/Program.csusing Confluent.Kafka; using System; using System.Collections.Generic; namespace CCloud { class Program { public static void Main(string[] args) { string HOST = "<broker_host_FQDN>:9091"; string TOPIC = "<topic_name>"; string USER = "<consumer_name>"; string PASS = "<consumer_password>"; string CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; var consumerConfig = new ConsumerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_SSL"}, {"ssl.ca.location", CA_FILE}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS}, {"group.id", "demo"} } ); var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build(); consumer.Subscribe(TOPIC); try { while (true) { var cr = consumer.Consume(); Console.WriteLine($"{cr.Message.Key}:{cr.Message.Value}"); } } catch (OperationCanceledException) { // Ctrl-C was pressed. } finally { consumer.Close(); } } } } -
Building and launching applications:
cd ~/cs-project/consumer && dotnet build && \ dotnet run bin/Debug/netcoreapp6.0/App.dllcd ~/cs-project/producer && dotnet build && \ dotnet run bin/Debug/netcoreapp6.0/App.dll
For info on how to get a broker host's FQDN, see this guide.
First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message messages to the topic. The consumer application displays messages sent to the topic.
Go
Before connecting:
-
Install the dependencies:
sudo apt update && sudo apt install -y golang git && \ go mod init example && \ go get github.com/Shopify/sarama@v1.38.1 && \ go get github.com/xdg-go/scram@v1.1.2 -
Create a directory for the project:
cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer -
Create the
scram.gofile with the SCRAM code, which is the same for the producer and consumer applications:scram.go
package main import ( "crypto/sha256" "crypto/sha512" "hash" "github.com/xdg-go/scram" ) var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn } func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) if err != nil { return err } x.ClientConversation = x.Client.NewConversation() return nil } func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { response, err = x.ClientConversation.Step(challenge) return } func (x *XDGSCRAMClient) Done() bool { return x.ClientConversation.Done() } -
Copy
scram.goto the producer and consumer application directories:cp scram.go producer/scram.go && cp scram.go consumer/scram.go
-
Code example for delivering a message to a topic:
producer/main.gopackage main import ( "fmt" "os" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<broker_host_FQDN>:9092" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Producer.Return.Successes = true conf.Version = sarama.V0_10_0_0 conf.ClientID = "sasl_scram_client" conf.Net.SASL.Enable = true conf.Net.SASL.Handshake = true conf.Net.SASL.User = "<producer_name>" conf.Net.SASL.Password = "<producer_password>" conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf) if err != nil { fmt.Println("Couldn't create producer: ", err.Error()) os.Exit(0) } publish("test message", syncProducer) } func publish(message string, producer sarama.SyncProducer) { // Publish sync msg := &sarama.ProducerMessage { Topic: "<topic_name>", Value: sarama.StringEncoder(message), } p, o, err := producer.SendMessage(msg) if err != nil { fmt.Println("Error publish: ", err.Error()) } fmt.Println("Partition: ", p) fmt.Println("Offset: ", o) } -
Code example for getting messages from a topic:
consumer/main.gopackage main import ( "fmt" "os" "os/signal" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<broker_host_FQDN>:9092" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Version = sarama.V0_10_0_0 conf.Consumer.Return.Errors = true conf.ClientID = "sasl_scram_client" conf.Metadata.Full = true conf.Net.SASL.Enable = true conf.Net.SASL.User = "<consumer_name>" conf.Net.SASL.Password = "<consumer_password>" conf.Net.SASL.Handshake = true conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) master, err := sarama.NewConsumer(splitBrokers, conf) if err != nil { fmt.Println("Coulnd't create consumer: ", err.Error()) os.Exit(1) } defer func() { if err := master.Close(); err != nil { panic(err) } }() topic := "<topic_name>" consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest) if err != nil { panic(err) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // Count the number of processed messages msgCount := 0 // Get signal to finish doneCh := make(chan struct{}) go func() { for { select { case err := <-consumer.Errors(): fmt.Println(err) case msg := <-consumer.Messages(): msgCount++ fmt.Println("Received messages", string(msg.Key), string(msg.Value)) case <-signals: fmt.Println("Interrupt is detected") doneCh <- struct{}{} } } }() <-doneCh fmt.Println("Processed", msgCount, "messages") } -
Building applications:
cd ~/go-project/producer && go build && \ cd ~/go-project/consumer && go build -
Running applications:
~/go-project/consumer/consumer~/go-project/producer/producer
-
Code example for delivering a message to a topic:
producer/main.gopackage main import ( "fmt" "crypto/tls" "crypto/x509" "io/ioutil" "os" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<broker_host_FQDN>:9091" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Producer.Return.Successes = true conf.Version = sarama.V0_10_0_0 conf.ClientID = "sasl_scram_client" conf.Net.SASL.Enable = true conf.Net.SASL.Handshake = true conf.Net.SASL.User = "<producer_name>" conf.Net.SASL.Password = "<producer_password>" conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) certs := x509.NewCertPool() pemPath := "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt" pemData, err := ioutil.ReadFile(pemPath) if err != nil { fmt.Println("Couldn't load cert: ", err.Error()) // Handle the error } certs.AppendCertsFromPEM(pemData) conf.Net.TLS.Enable = true conf.Net.TLS.Config = &tls.Config{ RootCAs: certs, } syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf) if err != nil { fmt.Println("Couldn't create producer: ", err.Error()) os.Exit(0) } publish("test message", syncProducer) } func publish(message string, producer sarama.SyncProducer) { // publish sync msg := &sarama.ProducerMessage { Topic: "<topic_name>", Value: sarama.StringEncoder(message), } p, o, err := producer.SendMessage(msg) if err != nil { fmt.Println("Error publish: ", err.Error()) } fmt.Println("Partition: ", p) fmt.Println("Offset: ", o) } -
Code example for getting messages from a topic:
consumer/main.gopackage main import ( "fmt" "crypto/tls" "crypto/x509" "io/ioutil" "os" "os/signal" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<broker_host_FQDN>:9091" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Version = sarama.V0_10_0_0 conf.Consumer.Return.Errors = true conf.ClientID = "sasl_scram_client" conf.Metadata.Full = true conf.Net.SASL.Enable = true conf.Net.SASL.User = "<consumer_name>" conf.Net.SASL.Password = "<consumer_password>" conf.Net.SASL.Handshake = true conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) certs := x509.NewCertPool() pemPath := "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt" pemData, err := ioutil.ReadFile(pemPath) if err != nil { fmt.Println("Couldn't load cert: ", err.Error()) // Handle the error } certs.AppendCertsFromPEM(pemData) conf.Net.TLS.Enable = true conf.Net.TLS.Config = &tls.Config{ RootCAs: certs, } master, err := sarama.NewConsumer(splitBrokers, conf) if err != nil { fmt.Println("Coulnd't create consumer: ", err.Error()) os.Exit(1) } defer func() { if err := master.Close(); err != nil { panic(err) } }() topic := "<topic_name>" consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest) if err != nil { panic(err) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // Count the number of processed messages msgCount := 0 // Get signal to finish doneCh := make(chan struct{}) go func() { for { select { case err := <-consumer.Errors(): fmt.Println(err) case msg := <-consumer.Messages(): msgCount++ fmt.Println("Received messages", string(msg.Key), string(msg.Value)) case <-signals: fmt.Println("Interrupt is detected") doneCh <- struct{}{} } } }() <-doneCh fmt.Println("Processed", msgCount, "messages") } -
Building applications:
cd ~/go-project/producer && go build && \ cd ~/go-project/consumer && go build -
Running applications:
~/go-project/consumer/consumer~/go-project/producer/producer
For info on how to get a broker host's FQDN, see this guide.
First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message messages to the topic. The consumer application displays messages sent to the topic.
Java
Before connecting:
-
Install the dependencies:
sudo apt update && sudo apt install --yes default-jdk maven -
Create a directory for the Maven project:
cd ~/ && \ mkdir --parents project/consumer/src/java/com/example project/producer/src/java/com/example && \ cd ~/project -
Create a configuration file for Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>app</artifactId> <packaging>jar</packaging> <version>0.1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version> </dependency> </dependencies> <build> <finalName>${project.artifactId}-${project.version}</finalName> <sourceDirectory>src</sourceDirectory> <resources> <resource> <directory>src</directory> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <goals> <goal>attached</goal> </goals> <phase>package</phase> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.example.App</mainClass> </manifest> </archive> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.1.0</version> <configuration> <archive> <manifest> <mainClass>com.example.App</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>Refer to the relevant project pages in the Maven repository for up-to-date versions of the dependencies:
-
Copy
pom.xmlto the producer and consumer application directories:cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
-
Code example for delivering messages to a topic:
producer/src/java/com/example/App.javapackage com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.clients.producer.*; public class App { public static void main(String[] args) { int MSG_COUNT = 5; String HOST = "<broker_FQDN>:9092"; String TOPIC = "<topic_name>"; String USER = "<producer_name>"; String PASS = "<producer_password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String KEY = "key"; String serializer = StringSerializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put("acks", "all"); props.put("key.serializer", serializer); props.put("value.serializer", serializer); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); Producer<String, String> producer = new KafkaProducer<>(props); try { for (int i = 1; i <= MSG_COUNT; i++){ producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get(); System.out.println("Test message " + i); } producer.flush(); producer.close(); } catch (Exception ex) { System.out.println(ex); producer.close(); } } } -
Code example for getting messages from a topic:
consumer/src/java/com/example/App.javapackage com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.clients.consumer.*; public class App { public static void main(String[] args) { String HOST = "<broker_FQDN>:9092"; String TOPIC = "<topic_name>"; String USER = "<consumer_name>"; String PASS = "<consumer_password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String GROUP = "demo"; String deserializer = StringDeserializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("group.id", GROUP); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(new String[] {TOPIC})); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + ":" + record.value()); } } } } -
Building applications:
cd ~/project/producer && mvn clean package && \ cd ~/project/consumer && mvn clean package -
Running applications:
java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jarjava -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
-
Go to the folder where the Java certificate store will be located:
cd /etc/security -
Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set a password of at least 6 characters using the
-storepassparameter for additional storage protection:sudo keytool -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore ssl -storepass <certificate_store_password> \ --noprompt -
Code example for delivering messages to a topic:
producer/src/java/com/example/App.javapackage com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.clients.producer.*; public class App { public static void main(String[] args) { int MSG_COUNT = 5; String HOST = "<broker_FQDN>:9091"; String TOPIC = "<topic_name>"; String USER = "<producer_name>"; String PASS = "<producer_password>"; String TS_FILE = "/etc/security/ssl"; String TS_PASS = "<certificate_storage_password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String KEY = "key"; String serializer = StringSerializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put("acks", "all"); props.put("key.serializer", serializer); props.put("value.serializer", serializer); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); props.put("ssl.truststore.location", TS_FILE); props.put("ssl.truststore.password", TS_PASS); Producer<String, String> producer = new KafkaProducer<>(props); try { for (int i = 1; i <= MSG_COUNT; i++){ producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get(); System.out.println("Test message " + i); } producer.flush(); producer.close(); } catch (Exception ex) { System.out.println(ex); producer.close(); } } } -
Code example for getting messages from a topic:
consumer/src/java/com/example/App.javapackage com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.clients.consumer.*; public class App { public static void main(String[] args) { String HOST = "<broker_FQDN>:9091"; String TOPIC = "<topic_name>"; String USER = "<consumer_name>"; String PASS = "<consumer_password>"; String TS_FILE = "/etc/security/ssl"; String TS_PASS = "<certificate_storage_password>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String GROUP = "demo"; String deserializer = StringDeserializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("group.id", GROUP); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); props.put("ssl.truststore.location", TS_FILE); props.put("ssl.truststore.password", TS_PASS); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(new String[] {TOPIC})); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + ":" + record.value()); } } } } -
Building applications:
cd ~/project/producer && mvn clean package && \ cd ~/project/consumer && mvn clean package -
Running applications:
java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jarjava -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
For info on how to get a broker host's FQDN, see this guide.
First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message messages to the topic. The consumer application displays messages sent to the topic.
Node.js
Before connecting, install the required dependencies:
sudo apt update && sudo apt install -y nodejs npm && \
npm install node-rdkafka
-
Code example for delivering messages to a topic:
producer.js"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<broker_FQDN>:9092"; const TOPIC = "<topic_name>"; const USER = "<producer_name>"; const PASS = "<producer_password>"; const producer = new Kafka.Producer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_PLAINTEXT", 'sasl.mechanism': "SCRAM-SHA-512" }); producer.connect(); producer.on('ready', function() { try { for (let i = 0; i < MSG_COUNT; ++i) { producer.produce(TOPIC, -1, Buffer.from("test message"), "key"); console.log("Produced: test message"); } producer.flush(10000, () => { producer.disconnect(); }); } catch (err) { console.error('Error'); console.error(err); } }); -
Code example for getting messages from a topic:
consumer.js"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<broker_FQDN>:9092"; const TOPIC = "<topic_name>"; const USER = "<consumer_name>"; const PASS = "<consumer_password>"; const consumer = new Kafka.Consumer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_PLAINTEXT", 'sasl.mechanism': "SCRAM-SHA-512", 'group.id': "demo" }); consumer.connect(); consumer .on('ready', function() { consumer.subscribe([TOPIC]); consumer.consume(); }) .on('data', function(data) { console.log(data.key + ":" + data.value.toString()); }); process.on('SIGINT', () => { console.log('\nDisconnecting consumer ...'); consumer.disconnect(); }); -
Running applications:
node consumer.jsnode producer.js
-
Code example for delivering messages to a topic:
producer.js"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<broker_FQDN>:9091"; const TOPIC = "<topic_name>"; const USER = "<producer_name>"; const PASS = "<producer_password>"; const CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; const producer = new Kafka.Producer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_SSL", 'ssl.ca.location': CA_FILE, 'sasl.mechanism': "SCRAM-SHA-512" }); producer.connect(); producer.on('ready', function() { try { for (let i = 0; i < MSG_COUNT; ++i) { producer.produce(TOPIC, -1, Buffer.from("test message"), "key"); console.log("Produced: test message"); } producer.flush(10000, () => { producer.disconnect(); }); } catch (err) { console.error('Error'); console.error(err); } }); -
Code example for getting messages from a topic:
consumer.js"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<broker_FQDN>:9091"; const TOPIC = "<topic_name>"; const USER = "<consumer_name>"; const PASS = "<consumer_password>"; const CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; const consumer = new Kafka.Consumer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_SSL", 'ssl.ca.location': CA_FILE, 'sasl.mechanism': "SCRAM-SHA-512", 'group.id': "demo" }); consumer.connect(); consumer .on('ready', function() { consumer.subscribe([TOPIC]); consumer.consume(); }) .on('data', function(data) { console.log(data.key + ":" + data.value.toString()); }); process.on('SIGINT', () => { console.log('\nDisconnecting consumer ...'); consumer.disconnect(); }); -
Running applications:
node consumer.jsnode producer.js
For info on how to get a broker host's FQDN, see this guide.
First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message messages to the topic. The consumer application displays messages sent to the topic.
Python (kafka-python)
Before connecting, install the required dependencies:
sudo apt update && sudo apt install -y python3 python3-pip libsnappy-dev && \
pip3 install kafka-python lz4 python-snappy crc32c
-
Code example for delivering a message to a topic:
producer.pyfrom kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='<broker_host_FQDN>:9092', security_protocol="SASL_PLAINTEXT", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<producer_name>', sasl_plain_password='<producer_password>') producer.send('<topic_name>', b'test message', b'key') producer.flush() producer.close() -
Code example for getting messages from a topic:
consumer.pyfrom kafka import KafkaConsumer consumer = KafkaConsumer( '<topic_name>', bootstrap_servers='<broker_FQDN>:9092', security_protocol="SASL_PLAINTEXT", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<consumer_name>', sasl_plain_password='<consumer_password>') print("ready") for msg in consumer: print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8")) -
Running applications:
python3 producer.pypython3 consumer.py
-
Code example for delivering a message to a topic:
producer.pyfrom kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='<broker_host_FQDN>:9091', security_protocol="SASL_SSL", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<producer_name>', sasl_plain_password='<producer_password>', ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt") producer.send('<topic_name>', b'test message', b'key') producer.flush() producer.close() -
Code example for getting messages from a topic:
consumer.pyfrom kafka import KafkaConsumer consumer = KafkaConsumer( '<topic_name>', bootstrap_servers='<broker_FQDN>:9091', security_protocol="SASL_SSL", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<consumer_name>', sasl_plain_password='<consumer_password>', ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt") print("ready") for msg in consumer: print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8")) -
Running applications:
python3 consumer.pypython3 producer.py
For info on how to get a broker host's FQDN, see this guide.
First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message messages to the topic. The consumer application displays messages sent to the topic.
Python (confluent-kafka)
Before connecting, install the required dependencies:
pip install confluent_kafka
-
Code example for delivering a message to a topic:
producer.pyfrom confluent_kafka import Producer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<broker_host_FQDN>:9092', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<producer_name>', 'sasl.password': '<producer_password>', 'error_cb': error_callback, } p = Producer(params) p.produce('<topic_name>', 'some payload1') p.flush(10) -
Code example for getting messages from a topic:
consumer.pyfrom confluent_kafka import Consumer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<broker_host_FQDN>:9092', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<consumer_name>', 'sasl.password': '<consumer_password>', 'group.id': 'test-consumer1', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'error_cb': error_callback, 'debug': 'all', } c = Consumer(params) c.subscribe(['<topic_name>']) while True: msg = c.poll(timeout=3.0) if msg: val = msg.value().decode() print(val) -
Running applications:
python3 producer.pypython3 consumer.py
-
Code example for delivering a message to a topic:
producer.pyfrom confluent_kafka import Producer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<broker_host_FQDN>:9091', 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<producer_name>', 'sasl.password': '<producer_password>', 'error_cb': error_callback, } p = Producer(params) p.produce('<topic_name>', 'some payload1') p.flush(10) -
Code example for getting messages from a topic:
consumer.pyfrom confluent_kafka import Consumer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<broker_host_FQDN>:9091', 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<consumer_name>', 'sasl.password': '<consumer_password>', 'group.id': 'test-consumer1', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'error_cb': error_callback, 'debug': 'all', } c = Consumer(params) c.subscribe(['<topic_name>']) while True: msg = c.poll(timeout=3.0) if msg: val = msg.value().decode() print(val) -
Running applications:
python3 consumer.pypython3 producer.py
For info on how to get a broker host's FQDN, see this guide.
First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message messages to the topic. The consumer application displays messages sent to the topic.