Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Managed Service for Apache Kafka®
  • Начало работы
    • Все инструкции
      • Предварительная настройка
      • Подключение из приложений
      • Примеры кода
    • Управление топиками
    • Управление пользователями
    • Управление коннекторами
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Yandex Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • История изменений
  • Вопросы и ответы
  • Обучающие курсы

В этой статье:

  • C#
  • Go
  • Java
  • Node.js
  • Python (kafka-python)
  • Python (confluent-kafka)
  1. Пошаговые инструкции
  2. Подключение
  3. Примеры кода

Примеры кода для подключения к кластеру Apache Kafka®

Статья создана
Yandex Cloud
Обновлена 1 октября 2024 г.
  • C#
  • Go
  • Java
  • Node.js
  • Python (kafka-python)
  • Python (confluent-kafka)

Вы можете подключаться к хостам кластера Apache Kafka® в публичном доступе только с использованием SSL-сертификата. В примерах ниже предполагается, что сертификат YandexInternalRootCA.crt расположен в директории:

  • /usr/local/share/ca-certificates/Yandex/ для Ubuntu;
  • $HOME\.kafka\ для Windows.

Подключение без использования SSL-сертификата поддерживается только для хостов, находящихся не в публичном доступе. В этом случае трафик внутри виртуальной сети при подключении к БД шифроваться не будет.

При необходимости перед подключением настройте группы безопасности кластера.

Примеры кода с заполненным FQDN хоста доступны в консоли управления по нажатию кнопки Подключиться на странице кластера.

Примеры проверялись в следующем окружении:

  • Виртуальная машина в Yandex Cloud с Ubuntu 20.04 LTS.
  • Bash: 5.0.16.
  • Python: 3.8.2, pip3: 20.0.2.
  • Node.JS: 10.19.0, npm: 6.14.4.
  • OpenJDK: 11.0.8, Maven: 3.6.3.
  • Go: 1.13.8.
  • mono-complete: 6.8.0.105.

C#C#

Перед подключением:

  1. Установите зависимости:

    sudo apt-get update && \
    sudo apt-get install -y apt-transport-https dotnet-sdk-6.0
    
  2. Создайте директорию для проекта:

    cd ~/ && mkdir cs-project && cd cs-project && mkdir -p consumer producer && cd ~/cs-project
    
  3. Создайте конфигурационный файл:

    App.csproj

    <Project Sdk="Microsoft.NET.Sdk">
      <PropertyGroup>
        <OutputType>Exe</OutputType>
        <TargetFramework>netcoreapp6.0</TargetFramework>
      </PropertyGroup>
    
      <ItemGroup>
        <PackageReference Include="Confluent.Kafka" Version="2.2.0" />
      </ItemGroup>
    </Project>
    
  4. Скопируйте App.csproj в директории приложения-производителя и приложения-потребителя:

    cp App.csproj producer/App.csproj && cp App.csproj consumer/App.csproj
    
Подключение без SSL
Подключение с SSL
  1. Пример кода для отправки сообщений в топик:

    cs-project/producer/Program.cs

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    
    namespace App
    {
        class Program
        {
            public static void Main(string[] args)
            {
                int MSG_COUNT = 5;
    
                string HOST = "<FQDN_хоста-брокера>:9092";
                string TOPIC = "<имя_топика>";
                string USER = "<имя_производителя>";
                string PASS = "<пароль_производителя>";
    
                var producerConfig = new ProducerConfig(
                    new Dictionary<string,string>{
                        {"bootstrap.servers", HOST},
                        {"security.protocol", "SASL_PLAINTEXT"},
                        {"sasl.mechanism", "SCRAM-SHA-512"},
                        {"sasl.username", USER},
                        {"sasl.password", PASS}
                    }
                );
    
                var producer = new ProducerBuilder<string, string>(producerConfig).Build();
    
                for(int i=0; i<MSG_COUNT; i++)
                {
                    producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" },
                    (deliveryReport) =>
                    {
                        if (deliveryReport.Error.Code != ErrorCode.NoError)
                        {
                            Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                        }
                        else
                        {
                        Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}");
                        }
                    });
                }
                producer.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }
    
  2. Пример кода для получения сообщений из топика:

    cs-project/consumer/Program.cs

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    
    namespace CCloud
    {
        class Program
        {
            public static void Main(string[] args)
            {
                string HOST = "<FQDN_хоста-брокера>:9092";
                string TOPIC = "<имя_топика>";
                string USER = "<имя_потребителя>";
                string PASS = "<пароль_потребителя>";
    
                var consumerConfig = new ConsumerConfig(
                    new Dictionary<string,string>{
                        {"bootstrap.servers", HOST},
                        {"security.protocol", "SASL_PLAINTEXT"},
                        {"sasl.mechanism", "SCRAM-SHA-512"},
                        {"sasl.username", USER},
                        {"sasl.password", PASS},
                        {"group.id", "demo"}
                    }
                );
    
                var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
                consumer.Subscribe(TOPIC);
                try
                {
                    while (true)
                    {
                        var cr = consumer.Consume();
                        Console.WriteLine($"{cr.Message.Key}:{cr.Message.Value}");
                    }
                }
                catch (OperationCanceledException)
                {
                    // Ctrl-C was pressed.
                }
                finally
                {
                    consumer.Close();
                }
            }
        }
    }
    
  3. Сборка и запуск приложений:

    cd ~/cs-project/consumer && dotnet build && \
    dotnet run bin/Debug/netcoreapp5.0/App.dll
    
    cd ~/cs-project/producer && dotnet build && \
    dotnet run bin/Debug/netcoreapp5.0/App.dll
    
  1. Пример кода для отправки сообщений в топик:

    cs-project/producer/Program.cs

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    
    namespace App
    {
        class Program
        {
            public static void Main(string[] args)
            {
                int MSG_COUNT = 5;
    
                string HOST = "<FQDN_хоста-брокера>:9091";
                string TOPIC = "<имя_топика>";
                string USER = "<имя_производителя>";
                string PASS = "<пароль_производителя>";
                string CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt";
    
                var producerConfig = new ProducerConfig(
                    new Dictionary<string,string>{
                        {"bootstrap.servers", HOST},
                        {"security.protocol", "SASL_SSL"},
                        {"ssl.ca.location", CA_FILE},
                        {"sasl.mechanism", "SCRAM-SHA-512"},
                        {"sasl.username", USER},
                        {"sasl.password", PASS}
                    }
                );
    
                var producer = new ProducerBuilder<string, string>(producerConfig).Build();
    
                for(int i=0; i<MSG_COUNT; i++)
                {
                    producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" },
                    (deliveryReport) =>
                        {
                            if (deliveryReport.Error.Code != ErrorCode.NoError)
                            {
                                Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                            }
                            else
                            {
                                Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}");
                            }
                    });
                 }
                 producer.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }
    
  2. Пример кода для получения сообщений из топика:

    cs-project/consumer/Program.cs

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    
    namespace CCloud
    {
        class Program
        {
            public static void Main(string[] args)
            {
                string HOST = "<FQDN_хоста-брокера>:9091";
                string TOPIC = "<имя_топика>";
                string USER = "<имя_потребителя>";
                string PASS = "<пароль_потребителя>";
                string CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt";
    
                var consumerConfig = new ConsumerConfig(
                    new Dictionary<string,string>{
                        {"bootstrap.servers", HOST},
                        {"security.protocol", "SASL_SSL"},
                        {"ssl.ca.location", CA_FILE},
                        {"sasl.mechanism", "SCRAM-SHA-512"},
                        {"sasl.username", USER},
                        {"sasl.password", PASS},
                        {"group.id", "demo"}
                    }
                );
    
                var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
                consumer.Subscribe(TOPIC);
                try
                {
                    while (true)
                    {
                        var cr = consumer.Consume();
                        Console.WriteLine($"{cr.Message.Key}:{cr.Message.Value}");
                    }
                }
                catch (OperationCanceledException)
                {
                    // Ctrl-C was pressed.
                }
                finally
                {
                    consumer.Close();
                }
            }
        }
    }
    
  3. Сборка и запуск приложений:

    cd ~/cs-project/consumer && dotnet build && \
    dotnet run bin/Debug/netcoreapp6.0/App.dll
    
    cd ~/cs-project/producer && dotnet build && \
    dotnet run bin/Debug/netcoreapp6.0/App.dll
    

Как получить FQDN хоста-брокера, см. в инструкции.

Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик одно или несколько сообщений key:test message. Приложение-потребитель отобразит сообщения, отправленные в топик.

GoGo

Перед подключением:

  1. Установите зависимости:

    sudo apt update && sudo apt install -y golang git && \
    go mod init example && \
    go get github.com/Shopify/sarama@v1.38.1 && \
    go get github.com/xdg-go/scram@v1.1.2
    
  2. Создайте директорию для проекта:

    cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer
    
  3. Создайте файл scram.go, содержащий код для использования SCRAM, общий для приложения-производителя и приложения-потребителя:

    scram.go
    package main
    
    import (
      "crypto/sha256"
      "crypto/sha512"
      "hash"
    
      "github.com/xdg-go/scram"
    )
    
    var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
    var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
    
    type XDGSCRAMClient struct {
      *scram.Client
      *scram.ClientConversation
      scram.HashGeneratorFcn
    }
    
    func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
        x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
        if err != nil {
          return err
        }
        x.ClientConversation = x.Client.NewConversation()
        return nil
    }
    
    func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
        response, err = x.ClientConversation.Step(challenge)
        return
    }
    
    func (x *XDGSCRAMClient) Done() bool {
        return x.ClientConversation.Done()
    }
    
  4. Скопируйте scram.go в директории приложения-производителя и приложения-потребителя:

    cp scram.go producer/scram.go && cp scram.go consumer/scram.go
    
Подключение без SSL
Подключение с SSL
  1. Пример кода для отправки сообщения в топик:

    producer/main.go

    package main
    
    import (
          "fmt"
          "os"
          "strings"
    
          "github.com/Shopify/sarama"
    )
    
    func main() {
          brokers := "<FQDN_хоста-брокера>:9092"
          splitBrokers := strings.Split(brokers, ",")
          conf := sarama.NewConfig()
          conf.Producer.RequiredAcks = sarama.WaitForAll
          conf.Producer.Return.Successes = true
          conf.Version = sarama.V0_10_0_0
          conf.ClientID = "sasl_scram_client"
          conf.Net.SASL.Enable = true
          conf.Net.SASL.Handshake = true
          conf.Net.SASL.User = "<имя_производителя>"
          conf.Net.SASL.Password = "<пароль_производителя>"
          conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
          conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
    
          syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
          if err != nil {
                fmt.Println("Couldn't create producer: ", err.Error())
                os.Exit(0)
          }
          publish("test message", syncProducer)
    }
    
    func publish(message string, producer sarama.SyncProducer) {
      // Publish sync
      msg := &sarama.ProducerMessage {
          Topic: "<имя_топика>",
          Value: sarama.StringEncoder(message),
      }
      p, o, err := producer.SendMessage(msg)
      if err != nil {
          fmt.Println("Error publish: ", err.Error())
      }
    
      fmt.Println("Partition: ", p)
      fmt.Println("Offset: ", o)
    }
    
  2. Пример кода для получения сообщений из топика:

    consumer/main.go

    package main
    
    import (
          "fmt"
          "os"
          "os/signal"
          "strings"
    
          "github.com/Shopify/sarama"
    )
    
    func main() {
          brokers := "<FQDN_хоста-брокера>:9092"
          splitBrokers := strings.Split(brokers, ",")
          conf := sarama.NewConfig()
          conf.Producer.RequiredAcks = sarama.WaitForAll
          conf.Version = sarama.V0_10_0_0
          conf.Consumer.Return.Errors = true
          conf.ClientID = "sasl_scram_client"
          conf.Metadata.Full = true
          conf.Net.SASL.Enable = true
          conf.Net.SASL.User =  "<имя_потребителя>"
          conf.Net.SASL.Password = "<пароль_потребителя>"
          conf.Net.SASL.Handshake = true
          conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
          conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
    
          master, err := sarama.NewConsumer(splitBrokers, conf)
          if err != nil {
                  fmt.Println("Coulnd't create consumer: ", err.Error())
                  os.Exit(1)
          }
    
          defer func() {
                  if err := master.Close(); err != nil {
                          panic(err)
                  }
          }()
    
          topic := "<имя_топика>"
    
          consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
          if err != nil {
                  panic(err)
          }
    
          signals := make(chan os.Signal, 1)
          signal.Notify(signals, os.Interrupt)
    
          // Count the number of processed messages
          msgCount := 0
    
          // Get signal to finish
          doneCh := make(chan struct{})
          go func() {
                  for {
                          select {
                          case err := <-consumer.Errors():
                                  fmt.Println(err)
                          case msg := <-consumer.Messages():
                                  msgCount++
                                  fmt.Println("Received messages", string(msg.Key), string(msg.Value))
                          case <-signals:
                                  fmt.Println("Interrupt is detected")
                                  doneCh <- struct{}{}
                          }
                  }
          }()
    
          <-doneCh
          fmt.Println("Processed", msgCount, "messages")
    }
    
  3. Сборка приложений:

    cd ~/go-project/producer && go build && \
    cd ~/go-project/consumer && go build
    
  4. Запуск приложений:

    ~/go-project/consumer/consumer
    
    ~/go-project/producer/producer
    
  1. Пример кода для отправки сообщения в топик:

    producer/main.go

    package main
    
    import (
          "fmt"
          "crypto/tls"
          "crypto/x509"
          "io/ioutil"
          "os"
          "strings"
    
          "github.com/Shopify/sarama"
    )
    
    func main() {
          brokers := "<FQDN_хоста-брокера>:9091"
          splitBrokers := strings.Split(brokers, ",")
          conf := sarama.NewConfig()
          conf.Producer.RequiredAcks = sarama.WaitForAll
          conf.Producer.Return.Successes = true
          conf.Version = sarama.V0_10_0_0
          conf.ClientID = "sasl_scram_client"
          conf.Net.SASL.Enable = true
          conf.Net.SASL.Handshake = true
          conf.Net.SASL.User = "<имя_производителя>"
          conf.Net.SASL.Password = "<пароль_производителя>"
          conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
          conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
    
          certs := x509.NewCertPool()
          pemPath := "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"
          pemData, err := ioutil.ReadFile(pemPath)
          if err != nil {
                  fmt.Println("Couldn't load cert: ", err.Error())
              // Handle the error
          }
          certs.AppendCertsFromPEM(pemData)
    
          conf.Net.TLS.Enable = true
    
          conf.Net.TLS.Config = &tls.Config{
            RootCAs: certs,
          }
    
          syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
          if err != nil {
                  fmt.Println("Couldn't create producer: ", err.Error())
                  os.Exit(0)
          }
          publish("test message", syncProducer)
    
    }
    
    func publish(message string, producer sarama.SyncProducer) {
      // publish sync
      msg := &sarama.ProducerMessage {
          Topic: "<имя_топика>",
          Value: sarama.StringEncoder(message),
      }
      p, o, err := producer.SendMessage(msg)
      if err != nil {
          fmt.Println("Error publish: ", err.Error())
      }
    
      fmt.Println("Partition: ", p)
      fmt.Println("Offset: ", o)
    }
    
  2. Пример кода для получения сообщений из топика:

    consumer/main.go

    package main
    
    import (
          "fmt"
          "crypto/tls"
          "crypto/x509"
          "io/ioutil"
          "os"
          "os/signal"
          "strings"
    
          "github.com/Shopify/sarama"
    )
    
    func main() {
          brokers := "<FQDN_хоста-брокера>:9091"
          splitBrokers := strings.Split(brokers, ",")
          conf := sarama.NewConfig()
          conf.Producer.RequiredAcks = sarama.WaitForAll
          conf.Version = sarama.V0_10_0_0
          conf.Consumer.Return.Errors = true
          conf.ClientID = "sasl_scram_client"
          conf.Metadata.Full = true
          conf.Net.SASL.Enable = true
          conf.Net.SASL.User =  "<имя_потребителя>"
          conf.Net.SASL.Password = "<пароль_потребителя>"
          conf.Net.SASL.Handshake = true
          conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
          conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
    
          certs := x509.NewCertPool()
          pemPath := "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"
          pemData, err := ioutil.ReadFile(pemPath)
          if err != nil {
              fmt.Println("Couldn't load cert: ", err.Error())
                  // Handle the error
          }
          certs.AppendCertsFromPEM(pemData)
    
          conf.Net.TLS.Enable = true
    
          conf.Net.TLS.Config = &tls.Config{
            RootCAs: certs,
          }
    
          master, err := sarama.NewConsumer(splitBrokers, conf)
          if err != nil {
                  fmt.Println("Coulnd't create consumer: ", err.Error())
                  os.Exit(1)
          }
    
          defer func() {
                  if err := master.Close(); err != nil {
                          panic(err)
                  }
          }()
    
          topic := "<имя_топика>"
    
          consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
          if err != nil {
                  panic(err)
          }
    
          signals := make(chan os.Signal, 1)
          signal.Notify(signals, os.Interrupt)
    
          // Count the number of processed messages
          msgCount := 0
    
          // Get signal to finish
          doneCh := make(chan struct{})
          go func() {
                  for {
                          select {
                          case err := <-consumer.Errors():
                                  fmt.Println(err)
                          case msg := <-consumer.Messages():
                                  msgCount++
                                  fmt.Println("Received messages", string(msg.Key), string(msg.Value))
                          case <-signals:
                                  fmt.Println("Interrupt is detected")
                                  doneCh <- struct{}{}
                          }
                  }
          }()
    
          <-doneCh
          fmt.Println("Processed", msgCount, "messages")
    }
    
  3. Сборка приложений:

    cd ~/go-project/producer && go build && \
    cd ~/go-project/consumer && go build
    
  4. Запуск приложений:

    ~/go-project/consumer/consumer
    
    ~/go-project/producer/producer
    

Как получить FQDN хоста-брокера, см. в инструкции.

Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик одно или несколько сообщений key:test message. Приложение-потребитель отобразит сообщения, отправленные в топик.

JavaJava

Перед подключением:

  1. Установите зависимости:

    sudo apt update && sudo apt install --yes default-jdk maven
    
  2. Создайте директорию для проекта Maven:

    cd ~/ && \
    mkdir --parents project/consumer/src/java/com/example project/producer/src/java/com/example && \
    cd ~/project
    
  3. Создайте конфигурационный файл для Maven:

    pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.example</groupId>
        <artifactId>app</artifactId>
        <packaging>jar</packaging>
        <version>0.1.0</version>
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.30</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.11.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.6.0</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>${project.artifactId}-${project.version}</finalName>
            <sourceDirectory>src</sourceDirectory>
            <resources>
                <resource>
                    <directory>src</directory>
                </resource>
            </resources>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <executions>
                        <execution>
                            <goals>
                                <goal>attached</goal>
                            </goals>
                            <phase>package</phase>
                            <configuration>
                                <descriptorRefs>
                                    <descriptorRef>jar-with-dependencies</descriptorRef>
                                </descriptorRefs>
                                <archive>
                                    <manifest>
                                        <mainClass>com.example.App</mainClass>
                                    </manifest>
                                </archive>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.1.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.example.App</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    Актуальные версии зависимостей уточняйте на страницах соответствующих проектов в репозитории Maven:

    • kafka-clients;
    • jackson-databind;
    • slf4j-simple.
  4. Скопируйте pom.xml в директории приложения-производителя и приложения-потребителя:

    cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
    
Подключение без SSL
Подключение с SSL
  1. Пример кода для отправки сообщений в топик:

    producer/src/java/com/example/App.java

    package com.example;
    
    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.clients.producer.*;
    
    public class App {
    
      public static void main(String[] args) {
    
        int MSG_COUNT = 5;
    
        String HOST = "<FQDN_брокера>:9092";
        String TOPIC = "<имя_топика>";
        String USER = "<имя_производителя>";
        String PASS = "<пароль_производителя>";
    
        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, USER, PASS);
        String KEY = "key";
    
        String serializer = StringSerializer.class.getName();
        Properties props = new Properties();
        props.put("bootstrap.servers", HOST);
        props.put("acks", "all");
        props.put("key.serializer", serializer);
        props.put("value.serializer", serializer);
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("sasl.jaas.config", jaasCfg);
    
        Producer<String, String> producer = new KafkaProducer<>(props);
    
        try {
         for (int i = 1; i <= MSG_COUNT; i++){
           producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get();
           System.out.println("Test message " + i);
          }
         producer.flush();
         producer.close();
        } catch (Exception ex) {
            System.out.println(ex);
            producer.close();
        }
      }
    }
    
  2. Пример кода для получения сообщений из топика:

    consumer/src/java/com/example/App.java

    package com.example;
    
    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.clients.consumer.*;
    
    public class App {
    
      public static void main(String[] args) {
    
        String HOST = "<FQDN_брокера>:9092";
        String TOPIC = "<имя_топика>";
        String USER = "<имя_потребителя>";
        String PASS = "<пароль_потребителя>";
    
        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, USER, PASS);
        String GROUP = "demo";
    
        String deserializer = StringDeserializer.class.getName();
        Properties props = new Properties();
        props.put("bootstrap.servers", HOST);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("group.id", GROUP);
        props.put("key.deserializer", deserializer);
        props.put("value.deserializer", deserializer);
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("sasl.jaas.config", jaasCfg);
    
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
    
        while(true) {
          ConsumerRecords<String, String> records = consumer.poll(100);
          for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.key() + ":" + record.value());
          }
        }
      }
    }
    
  3. Сборка приложений:

    cd ~/project/producer && mvn clean package && \
    cd ~/project/consumer && mvn clean package
    
  4. Запуск приложений:

    java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
    
    java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
    
  1. Перейдите в каталог, где будет располагаться хранилище сертификатов Java:

    cd /etc/security
    
  2. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль не короче 6 символов в параметре -storepass для дополнительной защиты хранилища:

    sudo keytool -importcert \
                 -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
                 -keystore ssl -storepass <пароль_хранилища_сертификатов> \
                 --noprompt
    
  3. Пример кода для отправки сообщений в топик:

    producer/src/java/com/example/App.java

    package com.example;
    
    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.clients.producer.*;
    
    public class App {
    
      public static void main(String[] args) {
    
        int MSG_COUNT = 5;
    
        String HOST = "<FQDN_брокера>:9091";
        String TOPIC = "<имя_топика>";
        String USER = "<имя_производителя>";
        String PASS = "<пароль_производителя>";
        String TS_FILE = "/etc/security/ssl";
        String TS_PASS = "<пароль_от_хранилища_сертификатов>";
    
        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, USER, PASS);
        String KEY = "key";
    
        String serializer = StringSerializer.class.getName();
        Properties props = new Properties();
        props.put("bootstrap.servers", HOST);
        props.put("acks", "all");
        props.put("key.serializer", serializer);
        props.put("value.serializer", serializer);
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("sasl.jaas.config", jaasCfg);
        props.put("ssl.truststore.location", TS_FILE);
        props.put("ssl.truststore.password", TS_PASS);
    
        Producer<String, String> producer = new KafkaProducer<>(props);
    
        try {
         for (int i = 1; i <= MSG_COUNT; i++){
           producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get();
           System.out.println("Test message " + i);
          }
         producer.flush();
         producer.close();
        } catch (Exception ex) {
            System.out.println(ex);
            producer.close();
        }
      }
    }
    
  4. Пример кода для получения сообщений из топика:

    consumer/src/java/com/example/App.java

    package com.example;
    
    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.clients.consumer.*;
    
    public class App {
    
      public static void main(String[] args) {
    
        String HOST = "<FQDN_брокера>:9091";
        String TOPIC = "<имя_топика>";
        String USER = "<имя_потребителя>";
        String PASS = "<пароль_потребителя>";
        String TS_FILE = "/etc/security/ssl";
        String TS_PASS = "<пароль_от_хранилища_сертификатов>";
    
        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, USER, PASS);
        String GROUP = "demo";
    
        String deserializer = StringDeserializer.class.getName();
        Properties props = new Properties();
        props.put("bootstrap.servers", HOST);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("group.id", GROUP);
        props.put("key.deserializer", deserializer);
        props.put("value.deserializer", deserializer);
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("sasl.jaas.config", jaasCfg);
        props.put("ssl.truststore.location", TS_FILE);
        props.put("ssl.truststore.password", TS_PASS);
    
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
    
        while(true) {
          ConsumerRecords<String, String> records = consumer.poll(100);
          for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.key() + ":" + record.value());
          }
        }
      }
    }
    
  5. Сборка приложений:

    cd ~/project/producer && mvn clean package && \
    cd ~/project/consumer && mvn clean package
    
  6. Запуск приложений:

    java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
    
    java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
    

Как получить FQDN хоста-брокера, см. в инструкции.

Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик одно или несколько сообщений key:test message. Приложение-потребитель отобразит сообщения, отправленные в топик.

Node.jsNode.js

Перед подключением установите зависимости:

sudo apt update && sudo apt install -y nodejs npm && \
npm install node-rdkafka
Подключение без SSL
Подключение с SSL
  1. Пример кода для отправки сообщений в топик:

    producer.js

    "use strict"
    const Kafka = require('node-rdkafka');
    
    const MSG_COUNT = 5;
    
    const HOST = "<FQDN_брокера>:9092";
    const TOPIC = "<имя_топика>";
    const USER = "<имя_производителя>";
    const PASS = "<пароль_производителя>";
    
    const producer = new Kafka.Producer({
      'bootstrap.servers': HOST,
      'sasl.username': USER,
      'sasl.password': PASS,
      'security.protocol': "SASL_PLAINTEXT",
      'sasl.mechanism': "SCRAM-SHA-512"
    });
    
    producer.connect();
    
    producer.on('ready', function() {
      try {
        for (let i = 0; i < MSG_COUNT; ++i) {
          producer.produce(TOPIC, -1, Buffer.from("test message"), "key");
          console.log("Produced: test message");
        }
    
        producer.flush(10000, () => {
            producer.disconnect();
          });
      } catch (err) {
        console.error('Error');
        console.error(err);
      }
    });
    
  2. Пример кода для получения сообщений из топика:

    consumer.js

    "use strict"
    const Kafka = require('node-rdkafka');
    
    const MSG_COUNT = 5;
    
    const HOST = "<FQDN_брокера>:9092";
    const TOPIC = "<имя_топика>";
    const USER = "<имя_потребителя>";
    const PASS = "<пароль_потребителя>";
    
    const consumer = new Kafka.Consumer({
      'bootstrap.servers': HOST,
      'sasl.username': USER,
      'sasl.password': PASS,
      'security.protocol': "SASL_PLAINTEXT",
      'sasl.mechanism': "SCRAM-SHA-512",
      'group.id': "demo"
    });
    
    consumer.connect();
    
    consumer
      .on('ready', function() {
        consumer.subscribe([TOPIC]);
        consumer.consume();
      })
      .on('data', function(data) {
        console.log(data.key + ":" + data.value.toString());
      });
    
    process.on('SIGINT', () => {
      console.log('\nDisconnecting consumer ...');
      consumer.disconnect();
    });
    
  3. Запуск приложений:

    node consumer.js
    
    node producer.js
    
  1. Пример кода для отправки сообщений в топик:

    producer.js

    "use strict"
    const Kafka = require('node-rdkafka');
    
    const MSG_COUNT = 5;
    
    const HOST = "<FQDN_брокера>:9091";
    const TOPIC = "<имя_топика>";
    const USER = "<имя_производителя>";
    const PASS = "<пароль_производителя>";
    const CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt";
    
    const producer = new Kafka.Producer({
      'bootstrap.servers': HOST,
      'sasl.username': USER,
      'sasl.password': PASS,
      'security.protocol': "SASL_SSL",
      'ssl.ca.location': CA_FILE,
      'sasl.mechanism': "SCRAM-SHA-512"
    });
    
    producer.connect();
    
    producer.on('ready', function() {
      try {
        for (let i = 0; i < MSG_COUNT; ++i) {
          producer.produce(TOPIC, -1, Buffer.from("test message"), "key");
          console.log("Produced: test message");
        }
    
        producer.flush(10000, () => {
            producer.disconnect();
          });
      } catch (err) {
        console.error('Error');
        console.error(err);
      }
    });
    
  2. Пример кода для получения сообщений из топика:

    consumer.js

    "use strict"
    const Kafka = require('node-rdkafka');
    
    const MSG_COUNT = 5;
    
    const HOST = "<FQDN_брокера>:9091";
    const TOPIC = "<имя_топика>";
    const USER = "<имя_потребителя>";
    const PASS = "<пароль_потребителя>";
    const CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt";
    
    const consumer = new Kafka.Consumer({
      'bootstrap.servers': HOST,
      'sasl.username': USER,
      'sasl.password': PASS,
      'security.protocol': "SASL_SSL",
      'ssl.ca.location': CA_FILE,
      'sasl.mechanism': "SCRAM-SHA-512",
      'group.id': "demo"
    });
    
    consumer.connect();
    
    consumer
      .on('ready', function() {
        consumer.subscribe([TOPIC]);
        consumer.consume();
      })
      .on('data', function(data) {
        console.log(data.key + ":" + data.value.toString());
      });
    
    process.on('SIGINT', () => {
      console.log('\nDisconnecting consumer ...');
      consumer.disconnect();
    });
    
  3. Запуск приложений:

    node consumer.js
    
    node producer.js
    

Как получить FQDN хоста-брокера, см. в инструкции.

Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик одно или несколько сообщений key:test message. Приложение-потребитель отобразит сообщения, отправленные в топик.

Python (kafka-python)Python (kafka-python)

Перед подключением установите зависимости:

sudo apt update && sudo apt install -y python3 python3-pip libsnappy-dev && \
pip3 install kafka-python lz4 python-snappy crc32c
Подключение без SSL
Подключение с SSL
  1. Пример кода для отправки сообщения в топик:

    producer.py

    from kafka import KafkaProducer
    
    producer = KafkaProducer(
        bootstrap_servers='<FQDN_хоста-брокера>:9092',
        security_protocol="SASL_PLAINTEXT",
        sasl_mechanism="SCRAM-SHA-512",
        sasl_plain_username='<имя_производителя>',
        sasl_plain_password='<пароль_производителя>')
    
    producer.send('<имя_топика>', b'test message', b'key')
    producer.flush()
    producer.close()
    
  2. Пример кода для получения сообщений из топика:

    consumer.py

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        '<имя_топика>',
        bootstrap_servers='<FQDN_брокера>:9092',
        security_protocol="SASL_PLAINTEXT",
        sasl_mechanism="SCRAM-SHA-512",
        sasl_plain_username='<имя_потребителя>',
        sasl_plain_password='<пароль_потребителя>')
    
    print("ready")
    
    for msg in consumer:
        print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))
    
  3. Запуск приложений:

    python3 producer.py
    
    python3 consumer.py
    
  1. Пример кода для отправки сообщения в топик:

    producer.py

    from kafka import KafkaProducer
    
    producer = KafkaProducer(
        bootstrap_servers='<FQDN_хоста-брокера>:9091',
        security_protocol="SASL_SSL",
        sasl_mechanism="SCRAM-SHA-512",
        sasl_plain_username='<имя_производителя>',
        sasl_plain_password='<пароль_производителя>',
        ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt")
    
    producer.send('<имя_топика>', b'test message', b'key')
    producer.flush()
    producer.close()
    
  2. Пример кода для получения сообщений из топика:

    consumer.py

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        '<имя_топика>',
        bootstrap_servers='<FQDN_брокера>:9091',
        security_protocol="SASL_SSL",
        sasl_mechanism="SCRAM-SHA-512",
        sasl_plain_username='<имя_потребителя>',
        sasl_plain_password='<пароль_потребителя>',
        ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt")
    
    print("ready")
    
    for msg in consumer:
        print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))
    
  3. Запуск приложений:

    python3 consumer.py
    
    python3 producer.py
    

Как получить FQDN хоста-брокера, см. в инструкции.

Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик одно или несколько сообщений key:test message. Приложение-потребитель отобразит сообщения, отправленные в топик.

Python (confluent-kafka)Python (confluent-kafka)

Перед подключением установите зависимости:

pip install confluent_kafka
Подключение без SSL
Подключение с SSL
  1. Пример кода для отправки сообщения в топик:

    producer.py

    from confluent_kafka import Producer
    
    def error_callback(err):
        print('Something went wrong: {}'.format(err))
    
    params = {
        'bootstrap.servers': '<FQDN_хоста-брокера>:9092',
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.mechanism': 'SCRAM-SHA-512',
        'sasl.username': '<имя_производителя>',
        'sasl.password': '<пароль_производителя>',
        'error_cb': error_callback,
    }
    
    p = Producer(params)
    p.produce('<имя_топика>', 'some payload1')
    p.flush(10)
    
  2. Пример кода для получения сообщений из топика:

    consumer.py

    from confluent_kafka import Consumer
    
    def error_callback(err):
        print('Something went wrong: {}'.format(err))
    
    params = {
        'bootstrap.servers': '<FQDN_хоста-брокера>:9092',
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.mechanism': 'SCRAM-SHA-512',
        'sasl.username': '<имя_потребителя>',
        'sasl.password': '<пароль_потребителя>',
        'group.id': 'test-consumer1',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
        'error_cb': error_callback,
        'debug': 'all',
    }
    c = Consumer(params)
    c.subscribe(['<имя_топика>'])
    while True:
        msg = c.poll(timeout=3.0)
        if msg:
            val = msg.value().decode()
            print(val)
    
  3. Запуск приложений:

    python3 producer.py
    
    python3 consumer.py
    
  1. Пример кода для отправки сообщения в топик:

    producer.py

    from confluent_kafka import Producer
    
    def error_callback(err):
        print('Something went wrong: {}'.format(err))
    
    params = {
        'bootstrap.servers': '<FQDN_хоста-брокера>:9091',
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt',
        'sasl.mechanism': 'SCRAM-SHA-512',
        'sasl.username': '<имя_производителя>',
        'sasl.password': '<пароль_производителя>',
        'error_cb': error_callback,
    }
    
    p = Producer(params)
    p.produce('<имя_топика>', 'some payload1')
    p.flush(10)
    
  2. Пример кода для получения сообщений из топика:

    consumer.py

    from confluent_kafka import Consumer
    
    def error_callback(err):
        print('Something went wrong: {}'.format(err))
    
    params = {
        'bootstrap.servers': '<FQDN_хоста-брокера>:9091',
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt',
        'sasl.mechanism': 'SCRAM-SHA-512',
        'sasl.username': '<имя_потребителя>',
        'sasl.password': '<пароль_потребителя>',
        'group.id': 'test-consumer1',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
        'error_cb': error_callback,
        'debug': 'all',
    }
    c = Consumer(params)
    c.subscribe(['<имя_топика>'])
    while True:
        msg = c.poll(timeout=3.0)
        if msg:
            val = msg.value().decode()
            print(val)
    
  3. Запуск приложений:

    python3 consumer.py
    
    python3 producer.py
    

Как получить FQDN хоста-брокера, см. в инструкции.

Сначала запустите приложение-потребитель, которое будет непрерывно считывать новые сообщения из топика. Затем запустите приложение-производитель, которое отправит в топик одно или несколько сообщений key:test message. Приложение-потребитель отобразит сообщения, отправленные в топик.

Была ли статья полезна?

Предыдущая
Подключение из приложений
Следующая
Управление топиками
Проект Яндекса
© 2025 ООО «Яндекс.Облако»