Примеры кода для подключения к кластеру Apache Kafka®
Вы можете подключаться к хостам кластера Apache Kafka® в публичном доступе только с использованием SSL-сертификата. В примерах ниже предполагается, что сертификат YandexInternalRootCA.crt
расположен в директории:
/usr/local/share/ca-certificates/Yandex/
для Ubuntu;$HOME\.kafka\
для Windows.
Подключение без использования SSL-сертификата поддерживается только для хостов, находящихся не в публичном доступе. В этом случае трафик внутри виртуальной сети при подключении к БД шифроваться не будет.
При необходимости перед подключением настройте группы безопасности кластера.
Примеры кода с заполненным FQDN хоста доступны в консоли управления
Примеры проверялись в следующем окружении:
- Виртуальная машина в Yandex Cloud с Ubuntu 20.04 LTS.
- Bash:
5.0.16
. - Python:
3.8.2
, pip3:20.0.2
. - Node.JS:
10.19.0
, npm:6.14.4
. - OpenJDK:
11.0.8
, Maven:3.6.3
. - Go:
1.13.8
. - mono-complete:
6.8.0.105
.
C#
Перед подключением:
-
Установите зависимости:
sudo apt-get update && \ sudo apt-get install -y apt-transport-https dotnet-sdk-6.0
-
Создайте директорию для проекта:
cd ~/ && mkdir cs-project && cd cs-project && mkdir -p consumer producer && cd ~/cs-project
-
Создайте конфигурационный файл:
App.csproj
<Project Sdk="Microsoft.NET.Sdk"> <PropertyGroup> <OutputType>Exe</OutputType> <TargetFramework>netcoreapp6.0</TargetFramework> </PropertyGroup> <ItemGroup> <PackageReference Include="Confluent.Kafka" Version="2.2.0" /> </ItemGroup> </Project>
-
Скопируйте
App.csproj
в директории приложения-производителя и приложения-потребителя:cp App.csproj producer/App.csproj && cp App.csproj consumer/App.csproj
-
Пример кода для отправки сообщений в топик:
cs-project/producer/Program.cs
using Confluent.Kafka; using System; using System.Collections.Generic; namespace App { class Program { public static void Main(string[] args) { int MSG_COUNT = 5; string HOST = "<FQDN_хоста-брокера>:9092"; string TOPIC = "<имя_топика>"; string USER = "<имя_производителя>"; string PASS = "<пароль_производителя>"; var producerConfig = new ProducerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_PLAINTEXT"}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS} } ); var producer = new ProducerBuilder<string, string>(producerConfig).Build(); for(int i=0; i<MSG_COUNT; i++) { producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); } }); } producer.Flush(TimeSpan.FromSeconds(10)); } } }
-
Пример кода для получения сообщений из топика:
cs-project/consumer/Program.cs
using Confluent.Kafka; using System; using System.Collections.Generic; namespace CCloud { class Program { public static void Main(string[] args) { string HOST = "<FQDN_хоста-брокера>:9092"; string TOPIC = "<имя_топика>"; string USER = "<имя_потребителя>"; string PASS = "<пароль_потребителя>"; var consumerConfig = new ConsumerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_PLAINTEXT"}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS}, {"group.id", "demo"} } ); var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build(); consumer.Subscribe(TOPIC); try { while (true) { var cr = consumer.Consume(); Console.WriteLine($"{cr.Message.Key}:{cr.Message.Value}"); } } catch (OperationCanceledException) { // Ctrl-C was pressed. } finally { consumer.Close(); } } } }
-
Сборка и запуск приложений:
cd ~/cs-project/consumer && dotnet build && \ dotnet run bin/Debug/netcoreapp5.0/App.dll
cd ~/cs-project/producer && dotnet build && \ dotnet run bin/Debug/netcoreapp5.0/App.dll
-
Пример кода для отправки сообщений в топик:
cs-project/producer/Program.cs
using Confluent.Kafka; using System; using System.Collections.Generic; namespace App { class Program { public static void Main(string[] args) { int MSG_COUNT = 5; string HOST = "<FQDN_хоста-брокера>:9091"; string TOPIC = "<имя_топика>"; string USER = "<имя_производителя>"; string PASS = "<пароль_производителя>"; string CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; var producerConfig = new ProducerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_SSL"}, {"ssl.ca.location", CA_FILE}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS} } ); var producer = new ProducerBuilder<string, string>(producerConfig).Build(); for(int i=0; i<MSG_COUNT; i++) { producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" }, (deliveryReport) => { if (deliveryReport.Error.Code != ErrorCode.NoError) { Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); } else { Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}"); } }); } producer.Flush(TimeSpan.FromSeconds(10)); } } }
-
Пример кода для получения сообщений из топика:
cs-project/consumer/Program.cs
using Confluent.Kafka; using System; using System.Collections.Generic; namespace CCloud { class Program { public static void Main(string[] args) { string HOST = "<FQDN_хоста-брокера>:9091"; string TOPIC = "<имя_топика>"; string USER = "<имя_потребителя>"; string PASS = "<пароль_потребителя>"; string CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; var consumerConfig = new ConsumerConfig( new Dictionary<string,string>{ {"bootstrap.servers", HOST}, {"security.protocol", "SASL_SSL"}, {"ssl.ca.location", CA_FILE}, {"sasl.mechanism", "SCRAM-SHA-512"}, {"sasl.username", USER}, {"sasl.password", PASS}, {"group.id", "demo"} } ); var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build(); consumer.Subscribe(TOPIC); try { while (true) { var cr = consumer.Consume(); Console.WriteLine($"{cr.Message.Key}:{cr.Message.Value}"); } } catch (OperationCanceledException) { // Ctrl-C was pressed. } finally { consumer.Close(); } } } }
-
Сборка и запуск приложений:
cd ~/cs-project/consumer && dotnet build && \ dotnet run bin/Debug/netcoreapp6.0/App.dll
cd ~/cs-project/producer && dotnet build && \ dotnet run bin/Debug/netcoreapp6.0/App.dll
Как получить FQDN хоста-брокера, см. в инструкции.
Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик одно или несколько сообщений key:test message
. Приложение-потребитель отобразит сообщения, отправленные в топик.
Go
Перед подключением:
-
Установите зависимости:
sudo apt update && sudo apt install -y golang git && \ go mod init example && \ go get github.com/Shopify/sarama@v1.38.1 && \ go get github.com/xdg-go/scram@v1.1.2
-
Создайте директорию для проекта:
cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer
-
Создайте файл
scram.go
, содержащий код для использования SCRAM , общий для приложения-производителя и приложения-потребителя:scram.go
package main import ( "crypto/sha256" "crypto/sha512" "hash" "github.com/xdg-go/scram" ) var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn } func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) if err != nil { return err } x.ClientConversation = x.Client.NewConversation() return nil } func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { response, err = x.ClientConversation.Step(challenge) return } func (x *XDGSCRAMClient) Done() bool { return x.ClientConversation.Done() }
-
Скопируйте
scram.go
в директории приложения-производителя и приложения-потребителя:cp scram.go producer/scram.go && cp scram.go consumer/scram.go
-
Пример кода для отправки сообщения в топик:
producer/main.go
package main import ( "fmt" "os" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<FQDN_хоста-брокера>:9092" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Producer.Return.Successes = true conf.Version = sarama.V0_10_0_0 conf.ClientID = "sasl_scram_client" conf.Net.SASL.Enable = true conf.Net.SASL.Handshake = true conf.Net.SASL.User = "<имя_производителя>" conf.Net.SASL.Password = "<пароль_производителя>" conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf) if err != nil { fmt.Println("Couldn't create producer: ", err.Error()) os.Exit(0) } publish("test message", syncProducer) } func publish(message string, producer sarama.SyncProducer) { // Publish sync msg := &sarama.ProducerMessage { Topic: "<имя_топика>", Value: sarama.StringEncoder(message), } p, o, err := producer.SendMessage(msg) if err != nil { fmt.Println("Error publish: ", err.Error()) } fmt.Println("Partition: ", p) fmt.Println("Offset: ", o) }
-
Пример кода для получения сообщений из топика:
consumer/main.go
package main import ( "fmt" "os" "os/signal" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<FQDN_хоста-брокера>:9092" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Version = sarama.V0_10_0_0 conf.Consumer.Return.Errors = true conf.ClientID = "sasl_scram_client" conf.Metadata.Full = true conf.Net.SASL.Enable = true conf.Net.SASL.User = "<имя_потребителя>" conf.Net.SASL.Password = "<пароль_потребителя>" conf.Net.SASL.Handshake = true conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) master, err := sarama.NewConsumer(splitBrokers, conf) if err != nil { fmt.Println("Coulnd't create consumer: ", err.Error()) os.Exit(1) } defer func() { if err := master.Close(); err != nil { panic(err) } }() topic := "<имя_топика>" consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest) if err != nil { panic(err) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // Count the number of processed messages msgCount := 0 // Get signal to finish doneCh := make(chan struct{}) go func() { for { select { case err := <-consumer.Errors(): fmt.Println(err) case msg := <-consumer.Messages(): msgCount++ fmt.Println("Received messages", string(msg.Key), string(msg.Value)) case <-signals: fmt.Println("Interrupt is detected") doneCh <- struct{}{} } } }() <-doneCh fmt.Println("Processed", msgCount, "messages") }
-
Сборка приложений:
cd ~/go-project/producer && go build && \ cd ~/go-project/consumer && go build
-
Запуск приложений:
~/go-project/consumer/consumer
~/go-project/producer/producer
-
Пример кода для отправки сообщения в топик:
producer/main.go
package main import ( "fmt" "crypto/tls" "crypto/x509" "io/ioutil" "os" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<FQDN_хоста-брокера>:9091" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Producer.Return.Successes = true conf.Version = sarama.V0_10_0_0 conf.ClientID = "sasl_scram_client" conf.Net.SASL.Enable = true conf.Net.SASL.Handshake = true conf.Net.SASL.User = "<имя_производителя>" conf.Net.SASL.Password = "<пароль_производителя>" conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) certs := x509.NewCertPool() pemPath := "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt" pemData, err := ioutil.ReadFile(pemPath) if err != nil { fmt.Println("Couldn't load cert: ", err.Error()) // Handle the error } certs.AppendCertsFromPEM(pemData) conf.Net.TLS.Enable = true conf.Net.TLS.Config = &tls.Config{ RootCAs: certs, } syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf) if err != nil { fmt.Println("Couldn't create producer: ", err.Error()) os.Exit(0) } publish("test message", syncProducer) } func publish(message string, producer sarama.SyncProducer) { // publish sync msg := &sarama.ProducerMessage { Topic: "<имя_топика>", Value: sarama.StringEncoder(message), } p, o, err := producer.SendMessage(msg) if err != nil { fmt.Println("Error publish: ", err.Error()) } fmt.Println("Partition: ", p) fmt.Println("Offset: ", o) }
-
Пример кода для получения сообщений из топика:
consumer/main.go
package main import ( "fmt" "crypto/tls" "crypto/x509" "io/ioutil" "os" "os/signal" "strings" "github.com/Shopify/sarama" ) func main() { brokers := "<FQDN_хоста-брокера>:9091" splitBrokers := strings.Split(brokers, ",") conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll conf.Version = sarama.V0_10_0_0 conf.Consumer.Return.Errors = true conf.ClientID = "sasl_scram_client" conf.Metadata.Full = true conf.Net.SASL.Enable = true conf.Net.SASL.User = "<имя_потребителя>" conf.Net.SASL.Password = "<пароль_потребителя>" conf.Net.SASL.Handshake = true conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) certs := x509.NewCertPool() pemPath := "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt" pemData, err := ioutil.ReadFile(pemPath) if err != nil { fmt.Println("Couldn't load cert: ", err.Error()) // Handle the error } certs.AppendCertsFromPEM(pemData) conf.Net.TLS.Enable = true conf.Net.TLS.Config = &tls.Config{ RootCAs: certs, } master, err := sarama.NewConsumer(splitBrokers, conf) if err != nil { fmt.Println("Coulnd't create consumer: ", err.Error()) os.Exit(1) } defer func() { if err := master.Close(); err != nil { panic(err) } }() topic := "<имя_топика>" consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest) if err != nil { panic(err) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // Count the number of processed messages msgCount := 0 // Get signal to finish doneCh := make(chan struct{}) go func() { for { select { case err := <-consumer.Errors(): fmt.Println(err) case msg := <-consumer.Messages(): msgCount++ fmt.Println("Received messages", string(msg.Key), string(msg.Value)) case <-signals: fmt.Println("Interrupt is detected") doneCh <- struct{}{} } } }() <-doneCh fmt.Println("Processed", msgCount, "messages") }
-
Сборка приложений:
cd ~/go-project/producer && go build && \ cd ~/go-project/consumer && go build
-
Запуск приложений:
~/go-project/consumer/consumer
~/go-project/producer/producer
Как получить FQDN хоста-брокера, см. в инструкции.
Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик одно или несколько сообщений key:test message
. Приложение-потребитель отобразит сообщения, отправленные в топик.
Java
Перед подключением:
-
Установите зависимости:
sudo apt update && sudo apt install --yes default-jdk maven
-
Создайте директорию для проекта Maven:
cd ~/ && \ mkdir --parents project/consumer/src/java/com/example project/producer/src/java/com/example && \ cd ~/project
-
Создайте конфигурационный файл для Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>app</artifactId> <packaging>jar</packaging> <version>0.1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version> </dependency> </dependencies> <build> <finalName>${project.artifactId}-${project.version}</finalName> <sourceDirectory>src</sourceDirectory> <resources> <resource> <directory>src</directory> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <goals> <goal>attached</goal> </goals> <phase>package</phase> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.example.App</mainClass> </manifest> </archive> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.1.0</version> <configuration> <archive> <manifest> <mainClass>com.example.App</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
Актуальные версии зависимостей уточняйте на страницах соответствующих проектов в репозитории Maven:
-
Скопируйте
pom.xml
в директории приложения-производителя и приложения-потребителя:cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
-
Пример кода для отправки сообщений в топик:
producer/src/java/com/example/App.java
package com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.clients.producer.*; public class App { public static void main(String[] args) { int MSG_COUNT = 5; String HOST = "<FQDN_брокера>:9092"; String TOPIC = "<имя_топика>"; String USER = "<имя_производителя>"; String PASS = "<пароль_производителя>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String KEY = "key"; String serializer = StringSerializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put("acks", "all"); props.put("key.serializer", serializer); props.put("value.serializer", serializer); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); Producer<String, String> producer = new KafkaProducer<>(props); try { for (int i = 1; i <= MSG_COUNT; i++){ producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get(); System.out.println("Test message " + i); } producer.flush(); producer.close(); } catch (Exception ex) { System.out.println(ex); producer.close(); } } }
-
Пример кода для получения сообщений из топика:
consumer/src/java/com/example/App.java
package com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.clients.consumer.*; public class App { public static void main(String[] args) { String HOST = "<FQDN_брокера>:9092"; String TOPIC = "<имя_топика>"; String USER = "<имя_потребителя>"; String PASS = "<пароль_потребителя>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String GROUP = "demo"; String deserializer = StringDeserializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("group.id", GROUP); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(new String[] {TOPIC})); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + ":" + record.value()); } } } }
-
Сборка приложений:
cd ~/project/producer && mvn clean package && \ cd ~/project/consumer && mvn clean package
-
Запуск приложений:
java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
-
Перейдите в каталог, где будет располагаться хранилище сертификатов Java:
cd /etc/security
-
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль не короче 6 символов в параметре
-storepass
для дополнительной защиты хранилища:sudo keytool -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore ssl -storepass <пароль_хранилища_сертификатов> \ --noprompt
-
Пример кода для отправки сообщений в топик:
producer/src/java/com/example/App.java
package com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.clients.producer.*; public class App { public static void main(String[] args) { int MSG_COUNT = 5; String HOST = "<FQDN_брокера>:9091"; String TOPIC = "<имя_топика>"; String USER = "<имя_производителя>"; String PASS = "<пароль_производителя>"; String TS_FILE = "/etc/security/ssl"; String TS_PASS = "<пароль_от_хранилища_сертификатов>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String KEY = "key"; String serializer = StringSerializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put("acks", "all"); props.put("key.serializer", serializer); props.put("value.serializer", serializer); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); props.put("ssl.truststore.location", TS_FILE); props.put("ssl.truststore.password", TS_PASS); Producer<String, String> producer = new KafkaProducer<>(props); try { for (int i = 1; i <= MSG_COUNT; i++){ producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get(); System.out.println("Test message " + i); } producer.flush(); producer.close(); } catch (Exception ex) { System.out.println(ex); producer.close(); } } }
-
Пример кода для получения сообщений из топика:
consumer/src/java/com/example/App.java
package com.example; import java.util.*; import org.apache.kafka.common.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.clients.consumer.*; public class App { public static void main(String[] args) { String HOST = "<FQDN_брокера>:9091"; String TOPIC = "<имя_топика>"; String USER = "<имя_потребителя>"; String PASS = "<пароль_потребителя>"; String TS_FILE = "/etc/security/ssl"; String TS_PASS = "<пароль_от_хранилища_сертификатов>"; String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(jaasTemplate, USER, PASS); String GROUP = "demo"; String deserializer = StringDeserializer.class.getName(); Properties props = new Properties(); props.put("bootstrap.servers", HOST); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("group.id", GROUP); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", jaasCfg); props.put("ssl.truststore.location", TS_FILE); props.put("ssl.truststore.password", TS_PASS); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(new String[] {TOPIC})); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + ":" + record.value()); } } } }
-
Сборка приложений:
cd ~/project/producer && mvn clean package && \ cd ~/project/consumer && mvn clean package
-
Запуск приложений:
java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
Как получить FQDN хоста-брокера, см. в инструкции.
Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик одно или несколько сообщений key:test message
. Приложение-потребитель отобразит сообщения, отправленные в топик.
Node.js
Перед подключением установите зависимости:
sudo apt update && sudo apt install -y nodejs npm && \
npm install node-rdkafka
-
Пример кода для отправки сообщений в топик:
producer.js
"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<FQDN_брокера>:9092"; const TOPIC = "<имя_топика>"; const USER = "<имя_производителя>"; const PASS = "<пароль_производителя>"; const producer = new Kafka.Producer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_PLAINTEXT", 'sasl.mechanism': "SCRAM-SHA-512" }); producer.connect(); producer.on('ready', function() { try { for (let i = 0; i < MSG_COUNT; ++i) { producer.produce(TOPIC, -1, Buffer.from("test message"), "key"); console.log("Produced: test message"); } producer.flush(10000, () => { producer.disconnect(); }); } catch (err) { console.error('Error'); console.error(err); } });
-
Пример кода для получения сообщений из топика:
consumer.js
"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<FQDN_брокера>:9092"; const TOPIC = "<имя_топика>"; const USER = "<имя_потребителя>"; const PASS = "<пароль_потребителя>"; const consumer = new Kafka.Consumer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_PLAINTEXT", 'sasl.mechanism': "SCRAM-SHA-512", 'group.id': "demo" }); consumer.connect(); consumer .on('ready', function() { consumer.subscribe([TOPIC]); consumer.consume(); }) .on('data', function(data) { console.log(data.key + ":" + data.value.toString()); }); process.on('SIGINT', () => { console.log('\nDisconnecting consumer ...'); consumer.disconnect(); });
-
Запуск приложений:
node consumer.js
node producer.js
-
Пример кода для отправки сообщений в топик:
producer.js
"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<FQDN_брокера>:9091"; const TOPIC = "<имя_топика>"; const USER = "<имя_производителя>"; const PASS = "<пароль_производителя>"; const CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; const producer = new Kafka.Producer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_SSL", 'ssl.ca.location': CA_FILE, 'sasl.mechanism': "SCRAM-SHA-512" }); producer.connect(); producer.on('ready', function() { try { for (let i = 0; i < MSG_COUNT; ++i) { producer.produce(TOPIC, -1, Buffer.from("test message"), "key"); console.log("Produced: test message"); } producer.flush(10000, () => { producer.disconnect(); }); } catch (err) { console.error('Error'); console.error(err); } });
-
Пример кода для получения сообщений из топика:
consumer.js
"use strict" const Kafka = require('node-rdkafka'); const MSG_COUNT = 5; const HOST = "<FQDN_брокера>:9091"; const TOPIC = "<имя_топика>"; const USER = "<имя_потребителя>"; const PASS = "<пароль_потребителя>"; const CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"; const consumer = new Kafka.Consumer({ 'bootstrap.servers': HOST, 'sasl.username': USER, 'sasl.password': PASS, 'security.protocol': "SASL_SSL", 'ssl.ca.location': CA_FILE, 'sasl.mechanism': "SCRAM-SHA-512", 'group.id': "demo" }); consumer.connect(); consumer .on('ready', function() { consumer.subscribe([TOPIC]); consumer.consume(); }) .on('data', function(data) { console.log(data.key + ":" + data.value.toString()); }); process.on('SIGINT', () => { console.log('\nDisconnecting consumer ...'); consumer.disconnect(); });
-
Запуск приложений:
node consumer.js
node producer.js
Как получить FQDN хоста-брокера, см. в инструкции.
Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик одно или несколько сообщений key:test message
. Приложение-потребитель отобразит сообщения, отправленные в топик.
Python (kafka-python)
Перед подключением установите зависимости:
sudo apt update && sudo apt install -y python3 python3-pip libsnappy-dev && \
pip3 install kafka-python lz4 python-snappy crc32c
-
Пример кода для отправки сообщения в топик:
producer.py
from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='<FQDN_хоста-брокера>:9092', security_protocol="SASL_PLAINTEXT", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<имя_производителя>', sasl_plain_password='<пароль_производителя>') producer.send('<имя_топика>', b'test message', b'key') producer.flush() producer.close()
-
Пример кода для получения сообщений из топика:
consumer.py
from kafka import KafkaConsumer consumer = KafkaConsumer( '<имя_топика>', bootstrap_servers='<FQDN_брокера>:9092', security_protocol="SASL_PLAINTEXT", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<имя_потребителя>', sasl_plain_password='<пароль_потребителя>') print("ready") for msg in consumer: print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))
-
Запуск приложений:
python3 producer.py
python3 consumer.py
-
Пример кода для отправки сообщения в топик:
producer.py
from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='<FQDN_хоста-брокера>:9091', security_protocol="SASL_SSL", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<имя_производителя>', sasl_plain_password='<пароль_производителя>', ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt") producer.send('<имя_топика>', b'test message', b'key') producer.flush() producer.close()
-
Пример кода для получения сообщений из топика:
consumer.py
from kafka import KafkaConsumer consumer = KafkaConsumer( '<имя_топика>', bootstrap_servers='<FQDN_брокера>:9091', security_protocol="SASL_SSL", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username='<имя_потребителя>', sasl_plain_password='<пароль_потребителя>', ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt") print("ready") for msg in consumer: print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))
-
Запуск приложений:
python3 consumer.py
python3 producer.py
Как получить FQDN хоста-брокера, см. в инструкции.
Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик одно или несколько сообщений key:test message
. Приложение-потребитель отобразит сообщения, отправленные в топик.
Python (confluent-kafka)
Перед подключением установите зависимости:
pip install confluent_kafka
-
Пример кода для отправки сообщения в топик:
producer.py
from confluent_kafka import Producer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<FQDN_хоста-брокера>:9092', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<имя_производителя>', 'sasl.password': '<пароль_производителя>', 'error_cb': error_callback, } p = Producer(params) p.produce('<имя_топика>', 'some payload1') p.flush(10)
-
Пример кода для получения сообщений из топика:
consumer.py
from confluent_kafka import Consumer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<FQDN_хоста-брокера>:9092', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<имя_потребителя>', 'sasl.password': '<пароль_потребителя>', 'group.id': 'test-consumer1', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'error_cb': error_callback, 'debug': 'all', } c = Consumer(params) c.subscribe(['<имя_топика>']) while True: msg = c.poll(timeout=3.0) if msg: val = msg.value().decode() print(val)
-
Запуск приложений:
python3 producer.py
python3 consumer.py
-
Пример кода для отправки сообщения в топик:
producer.py
from confluent_kafka import Producer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<FQDN_хоста-брокера>:9091', 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<имя_производителя>', 'sasl.password': '<пароль_производителя>', 'error_cb': error_callback, } p = Producer(params) p.produce('<имя_топика>', 'some payload1') p.flush(10)
-
Пример кода для получения сообщений из топика:
consumer.py
from confluent_kafka import Consumer def error_callback(err): print('Something went wrong: {}'.format(err)) params = { 'bootstrap.servers': '<FQDN_хоста-брокера>:9091', 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': '<имя_потребителя>', 'sasl.password': '<пароль_потребителя>', 'group.id': 'test-consumer1', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'error_cb': error_callback, 'debug': 'all', } c = Consumer(params) c.subscribe(['<имя_топика>']) while True: msg = c.poll(timeout=3.0) if msg: val = msg.value().decode() print(val)
-
Запуск приложений:
python3 consumer.py
python3 producer.py
Как получить FQDN хоста-брокера, см. в инструкции.
Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик одно или несколько сообщений key:test message
. Приложение-потребитель отобразит сообщения, отправленные в топик.