Yandex Cloud
Search
Contact UsGet started
  • Blog
  • Pricing
  • Documentation
  • All Services
  • System Status
    • Featured
    • Infrastructure & Network
    • Data Platform
    • Containers
    • Developer tools
    • Serverless
    • Security
    • Monitoring & Resources
    • ML & AI
    • Business tools
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Customer Stories
    • Gateway to Russia
    • Cloud for Startups
    • Education and Science
  • Blog
  • Pricing
  • Documentation
Yandex project
© 2025 Yandex.Cloud LLC
Yandex Managed Service for Apache Kafka®
  • Getting started
    • All guides
      • Pre-configuration
      • Connecting from applications
      • Code snippets
    • Managing topics
    • Managing users
    • Managing connectors
  • Access management
  • Pricing policy
  • Terraform reference
  • Yandex Monitoring metrics
  • Audit Trails events
  • Public materials
  • Release notes
  • FAQ

In this article:

  • C#
  • Go
  • Java
  • Node.js
  • Python (kafka-python)
  • Python (confluent-kafka)
  1. Step-by-step guides
  2. Connection
  3. Code snippets

Code examples for connecting to an Apache Kafka® cluster

Written by
Yandex Cloud
Updated at October 29, 2024
  • C#
  • Go
  • Java
  • Node.js
  • Python (kafka-python)
  • Python (confluent-kafka)

You can connect to public Apache Kafka® cluster hosts only if you use an SSL certificate. The examples below assume that the YandexInternalRootCA.crt certificate is located in the directory:

  • /usr/local/share/ca-certificates/Yandex/ for Ubuntu.
  • $HOME\.kafka\ for Windows.

Connecting without an SSL certificate is only supported for non-public hosts. For connections to the database, traffic inside the virtual network is not encrypted in this case.

Before connecting, configure security groups for the cluster, if required.

To see code examples with the host FQDN filled in, open the cluster page in the management console and click Connect.

Examples were tested in the following environment:

  • Yandex Cloud virtual machine running Ubuntu 20.04 LTS.
  • Bash: 5.0.16.
  • Python: 3.8.2, pip3: 20.0.2.
  • Node.JS: 10.19.0, npm: 6.14.4.
  • OpenJDK: 11.0.8, Maven: 3.6.3.
  • Go: 1.13.8.
  • mono-complete: 6.8.0.105.

C#C#

Before connecting:

  1. Install the dependencies:

    sudo apt-get update && \
    sudo apt-get install -y apt-transport-https dotnet-sdk-6.0
    
  2. Create a directory for the project:

    cd ~/ && mkdir cs-project && cd cs-project && mkdir -p consumer producer && cd ~/cs-project
    
  3. Create a configuration file:

    App.csproj

    <Project Sdk="Microsoft.NET.Sdk">
      <PropertyGroup>
        <OutputType>Exe</OutputType>
        <TargetFramework>netcoreapp6.0</TargetFramework>
      </PropertyGroup>
    
      <ItemGroup>
        <PackageReference Include="Confluent.Kafka" Version="2.2.0" />
      </ItemGroup>
    </Project>
    
  4. Copy App.csproj to the producer and consumer application directories:

    cp App.csproj producer/App.csproj && cp App.csproj consumer/App.csproj
    
Connecting without SSL
Connecting via SSL
  1. Code example for delivering messages to a topic:

    cs-project/producer/Program.cs

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    
    namespace App
    {
        class Program
        {
            public static void Main(string[] args)
            {
                int MSG_COUNT = 5;
    
                string HOST = "<broker_host_FQDN>:9092";
                string TOPIC = "<topic_name>";
                string USER = "<producer_name>";
                string PASS = "<producer_password>";
    
                var producerConfig = new ProducerConfig(
                    new Dictionary<string,string>{
                        {"bootstrap.servers", HOST},
                        {"security.protocol", "SASL_PLAINTEXT"},
                        {"sasl.mechanism", "SCRAM-SHA-512"},
                        {"sasl.username", USER},
                        {"sasl.password", PASS}
                    }
                );
    
                var producer = new ProducerBuilder<string, string>(producerConfig).Build();
    
                for(int i=0; i<MSG_COUNT; i++)
                {
                    producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" },
                    (deliveryReport) =>
                    {
                        if (deliveryReport.Error.Code != ErrorCode.NoError)
                        {
                            Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                        }
                        else
                        {
                        Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}");
                        }
                    });
                }
                producer.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }
    
  2. Code example for getting messages from a topic:

    cs-project/consumer/Program.cs

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    
    namespace CCloud
    {
        class Program
        {
            public static void Main(string[] args)
            {
                string HOST = "<broker_host_FQDN>:9092";
                string TOPIC = "<topic_name>";
                string USER = "<consumer_name>";
                string PASS = "<consumer_password>";
    
                var consumerConfig = new ConsumerConfig(
                    new Dictionary<string,string>{
                        {"bootstrap.servers", HOST},
                        {"security.protocol", "SASL_PLAINTEXT"},
                        {"sasl.mechanism", "SCRAM-SHA-512"},
                        {"sasl.username", USER},
                        {"sasl.password", PASS},
                        {"group.id", "demo"}
                    }
                );
    
                var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
                consumer.Subscribe(TOPIC);
                try
                {
                    while (true)
                    {
                        var cr = consumer.Consume();
                        Console.WriteLine($"{cr.Message.Key}:{cr.Message.Value}");
                    }
                }
                catch (OperationCanceledException)
                {
                    // Ctrl-C was pressed.
                }
                finally
                {
                    consumer.Close();
                }
            }
        }
    }
    
  3. Building and launching applications:

    cd ~/cs-project/consumer && dotnet build && \
    dotnet run bin/Debug/netcoreapp5.0/App.dll
    
    cd ~/cs-project/producer && dotnet build && \
    dotnet run bin/Debug/netcoreapp5.0/App.dll
    
  1. Code example for delivering messages to a topic:

    cs-project/producer/Program.cs

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    
    namespace App
    {
        class Program
        {
            public static void Main(string[] args)
            {
                int MSG_COUNT = 5;
    
                string HOST = "<broker_host_FQDN>:9091";
                string TOPIC = "<topic_name>";
                string USER = "<producer_name>";
                string PASS = "<producer_password>";
                string CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt";
    
                var producerConfig = new ProducerConfig(
                    new Dictionary<string,string>{
                        {"bootstrap.servers", HOST},
                        {"security.protocol", "SASL_SSL"},
                        {"ssl.ca.location", CA_FILE},
                        {"sasl.mechanism", "SCRAM-SHA-512"},
                        {"sasl.username", USER},
                        {"sasl.password", PASS}
                    }
                );
    
                var producer = new ProducerBuilder<string, string>(producerConfig).Build();
    
                for(int i=0; i<MSG_COUNT; i++)
                {
                    producer.Produce(TOPIC, new Message<string, string> { Key = "key", Value = "test message" },
                    (deliveryReport) =>
                        {
                            if (deliveryReport.Error.Code != ErrorCode.NoError)
                            {
                                Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                            }
                            else
                            {
                                Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}");
                            }
                    });
                 }
                 producer.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }
    
  2. Code example for getting messages from a topic:

    cs-project/consumer/Program.cs

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    
    namespace CCloud
    {
        class Program
        {
            public static void Main(string[] args)
            {
                string HOST = "<broker_host_FQDN>:9091";
                string TOPIC = "<topic_name>";
                string USER = "<consumer_name>";
                string PASS = "<consumer_password>";
                string CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt";
    
                var consumerConfig = new ConsumerConfig(
                    new Dictionary<string,string>{
                        {"bootstrap.servers", HOST},
                        {"security.protocol", "SASL_SSL"},
                        {"ssl.ca.location", CA_FILE},
                        {"sasl.mechanism", "SCRAM-SHA-512"},
                        {"sasl.username", USER},
                        {"sasl.password", PASS},
                        {"group.id", "demo"}
                    }
                );
    
                var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
                consumer.Subscribe(TOPIC);
                try
                {
                    while (true)
                    {
                        var cr = consumer.Consume();
                        Console.WriteLine($"{cr.Message.Key}:{cr.Message.Value}");
                    }
                }
                catch (OperationCanceledException)
                {
                    // Ctrl-C was pressed.
                }
                finally
                {
                    consumer.Close();
                }
            }
        }
    }
    
  3. Building and launching applications:

    cd ~/cs-project/consumer && dotnet build && \
    dotnet run bin/Debug/netcoreapp6.0/App.dll
    
    cd ~/cs-project/producer && dotnet build && \
    dotnet run bin/Debug/netcoreapp6.0/App.dll
    

For info on how to get a broker host's FQDN, see this guide.

First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message messages to the topic. The consumer application displays messages sent to the topic.

GoGo

Before connecting:

  1. Install the dependencies:

    sudo apt update && sudo apt install -y golang git && \
    go mod init example && \
    go get github.com/Shopify/sarama@v1.38.1 && \
    go get github.com/xdg-go/scram@v1.1.2
    
  2. Create a directory for the project:

    cd ~/ && mkdir go-project && cd go-project && mkdir -p consumer producer
    
  3. Create the scram.go file with the SCRAM code, which is the same for the producer and consumer applications:

    scram.go
    package main
    
    import (
      "crypto/sha256"
      "crypto/sha512"
      "hash"
    
      "github.com/xdg-go/scram"
    )
    
    var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
    var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
    
    type XDGSCRAMClient struct {
      *scram.Client
      *scram.ClientConversation
      scram.HashGeneratorFcn
    }
    
    func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
        x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
        if err != nil {
          return err
        }
        x.ClientConversation = x.Client.NewConversation()
        return nil
    }
    
    func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
        response, err = x.ClientConversation.Step(challenge)
        return
    }
    
    func (x *XDGSCRAMClient) Done() bool {
        return x.ClientConversation.Done()
    }
    
  4. Copy scram.go to the producer and consumer application directories:

    cp scram.go producer/scram.go && cp scram.go consumer/scram.go
    
Connecting without SSL
Connecting via SSL
  1. Code example for delivering a message to a topic:

    producer/main.go

    package main
    
    import (
          "fmt"
          "os"
          "strings"
    
          "github.com/Shopify/sarama"
    )
    
    func main() {
          brokers := "<broker_host_FQDN>:9092"
          splitBrokers := strings.Split(brokers, ",")
          conf := sarama.NewConfig()
          conf.Producer.RequiredAcks = sarama.WaitForAll
          conf.Producer.Return.Successes = true
          conf.Version = sarama.V0_10_0_0
          conf.ClientID = "sasl_scram_client"
          conf.Net.SASL.Enable = true
          conf.Net.SASL.Handshake = true
          conf.Net.SASL.User = "<producer_name>"
          conf.Net.SASL.Password = "<producer_password>"
          conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
          conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
    
          syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
          if err != nil {
                fmt.Println("Couldn't create producer: ", err.Error())
                os.Exit(0)
          }
          publish("test message", syncProducer)
    }
    
    func publish(message string, producer sarama.SyncProducer) {
      // Publish sync
      msg := &sarama.ProducerMessage {
          Topic: "<topic_name>",
          Value: sarama.StringEncoder(message),
      }
      p, o, err := producer.SendMessage(msg)
      if err != nil {
          fmt.Println("Error publish: ", err.Error())
      }
    
      fmt.Println("Partition: ", p)
      fmt.Println("Offset: ", o)
    }
    
  2. Code example for getting messages from a topic:

    consumer/main.go

    package main
    
    import (
          "fmt"
          "os"
          "os/signal"
          "strings"
    
          "github.com/Shopify/sarama"
    )
    
    func main() {
          brokers := "<broker_host_FQDN>:9092"
          splitBrokers := strings.Split(brokers, ",")
          conf := sarama.NewConfig()
          conf.Producer.RequiredAcks = sarama.WaitForAll
          conf.Version = sarama.V0_10_0_0
          conf.Consumer.Return.Errors = true
          conf.ClientID = "sasl_scram_client"
          conf.Metadata.Full = true
          conf.Net.SASL.Enable = true
          conf.Net.SASL.User =  "<consumer_name>"
          conf.Net.SASL.Password = "<consumer_password>"
          conf.Net.SASL.Handshake = true
          conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
          conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
    
          master, err := sarama.NewConsumer(splitBrokers, conf)
          if err != nil {
                  fmt.Println("Coulnd't create consumer: ", err.Error())
                  os.Exit(1)
          }
    
          defer func() {
                  if err := master.Close(); err != nil {
                          panic(err)
                  }
          }()
    
          topic := "<topic_name>"
    
          consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
          if err != nil {
                  panic(err)
          }
    
          signals := make(chan os.Signal, 1)
          signal.Notify(signals, os.Interrupt)
    
          // Count the number of processed messages
          msgCount := 0
    
          // Get signal to finish
          doneCh := make(chan struct{})
          go func() {
                  for {
                          select {
                          case err := <-consumer.Errors():
                                  fmt.Println(err)
                          case msg := <-consumer.Messages():
                                  msgCount++
                                  fmt.Println("Received messages", string(msg.Key), string(msg.Value))
                          case <-signals:
                                  fmt.Println("Interrupt is detected")
                                  doneCh <- struct{}{}
                          }
                  }
          }()
    
          <-doneCh
          fmt.Println("Processed", msgCount, "messages")
    }
    
  3. Building applications:

    cd ~/go-project/producer && go build && \
    cd ~/go-project/consumer && go build
    
  4. Running applications:

    ~/go-project/consumer/consumer
    
    ~/go-project/producer/producer
    
  1. Code example for delivering a message to a topic:

    producer/main.go

    package main
    
    import (
          "fmt"
          "crypto/tls"
          "crypto/x509"
          "io/ioutil"
          "os"
          "strings"
    
          "github.com/Shopify/sarama"
    )
    
    func main() {
          brokers := "<broker_host_FQDN>:9091"
          splitBrokers := strings.Split(brokers, ",")
          conf := sarama.NewConfig()
          conf.Producer.RequiredAcks = sarama.WaitForAll
          conf.Producer.Return.Successes = true
          conf.Version = sarama.V0_10_0_0
          conf.ClientID = "sasl_scram_client"
          conf.Net.SASL.Enable = true
          conf.Net.SASL.Handshake = true
          conf.Net.SASL.User = "<producer_name>"
          conf.Net.SASL.Password = "<producer_password>"
          conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
          conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
    
          certs := x509.NewCertPool()
          pemPath := "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"
          pemData, err := ioutil.ReadFile(pemPath)
          if err != nil {
                  fmt.Println("Couldn't load cert: ", err.Error())
              // Handle the error
          }
          certs.AppendCertsFromPEM(pemData)
    
          conf.Net.TLS.Enable = true
    
          conf.Net.TLS.Config = &tls.Config{
            RootCAs: certs,
          }
    
          syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
          if err != nil {
                  fmt.Println("Couldn't create producer: ", err.Error())
                  os.Exit(0)
          }
          publish("test message", syncProducer)
    
    }
    
    func publish(message string, producer sarama.SyncProducer) {
      // publish sync
      msg := &sarama.ProducerMessage {
          Topic: "<topic_name>",
          Value: sarama.StringEncoder(message),
      }
      p, o, err := producer.SendMessage(msg)
      if err != nil {
          fmt.Println("Error publish: ", err.Error())
      }
    
      fmt.Println("Partition: ", p)
      fmt.Println("Offset: ", o)
    }
    
  2. Code example for getting messages from a topic:

    consumer/main.go

    package main
    
    import (
          "fmt"
          "crypto/tls"
          "crypto/x509"
          "io/ioutil"
          "os"
          "os/signal"
          "strings"
    
          "github.com/Shopify/sarama"
    )
    
    func main() {
          brokers := "<broker_host_FQDN>:9091"
          splitBrokers := strings.Split(brokers, ",")
          conf := sarama.NewConfig()
          conf.Producer.RequiredAcks = sarama.WaitForAll
          conf.Version = sarama.V0_10_0_0
          conf.Consumer.Return.Errors = true
          conf.ClientID = "sasl_scram_client"
          conf.Metadata.Full = true
          conf.Net.SASL.Enable = true
          conf.Net.SASL.User =  "<consumer_name>"
          conf.Net.SASL.Password = "<consumer_password>"
          conf.Net.SASL.Handshake = true
          conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
          conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
    
          certs := x509.NewCertPool()
          pemPath := "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt"
          pemData, err := ioutil.ReadFile(pemPath)
          if err != nil {
              fmt.Println("Couldn't load cert: ", err.Error())
                  // Handle the error
          }
          certs.AppendCertsFromPEM(pemData)
    
          conf.Net.TLS.Enable = true
    
          conf.Net.TLS.Config = &tls.Config{
            RootCAs: certs,
          }
    
          master, err := sarama.NewConsumer(splitBrokers, conf)
          if err != nil {
                  fmt.Println("Coulnd't create consumer: ", err.Error())
                  os.Exit(1)
          }
    
          defer func() {
                  if err := master.Close(); err != nil {
                          panic(err)
                  }
          }()
    
          topic := "<topic_name>"
    
          consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetOldest)
          if err != nil {
                  panic(err)
          }
    
          signals := make(chan os.Signal, 1)
          signal.Notify(signals, os.Interrupt)
    
          // Count the number of processed messages
          msgCount := 0
    
          // Get signal to finish
          doneCh := make(chan struct{})
          go func() {
                  for {
                          select {
                          case err := <-consumer.Errors():
                                  fmt.Println(err)
                          case msg := <-consumer.Messages():
                                  msgCount++
                                  fmt.Println("Received messages", string(msg.Key), string(msg.Value))
                          case <-signals:
                                  fmt.Println("Interrupt is detected")
                                  doneCh <- struct{}{}
                          }
                  }
          }()
    
          <-doneCh
          fmt.Println("Processed", msgCount, "messages")
    }
    
  3. Building applications:

    cd ~/go-project/producer && go build && \
    cd ~/go-project/consumer && go build
    
  4. Running applications:

    ~/go-project/consumer/consumer
    
    ~/go-project/producer/producer
    

For info on how to get a broker host's FQDN, see this guide.

First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message messages to the topic. The consumer application displays messages sent to the topic.

JavaJava

Before connecting:

  1. Install the dependencies:

    sudo apt update && sudo apt install --yes default-jdk maven
    
  2. Create a folder for the Maven project:

    cd ~/ && \
    mkdir --parents project/consumer/src/java/com/example project/producer/src/java/com/example && \
    cd ~/project
    
  3. Create a configuration file for Maven:

    pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.example</groupId>
        <artifactId>app</artifactId>
        <packaging>jar</packaging>
        <version>0.1.0</version>
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.30</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.11.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.6.0</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>${project.artifactId}-${project.version}</finalName>
            <sourceDirectory>src</sourceDirectory>
            <resources>
                <resource>
                    <directory>src</directory>
                </resource>
            </resources>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <executions>
                        <execution>
                            <goals>
                                <goal>attached</goal>
                            </goals>
                            <phase>package</phase>
                            <configuration>
                                <descriptorRefs>
                                    <descriptorRef>jar-with-dependencies</descriptorRef>
                                </descriptorRefs>
                                <archive>
                                    <manifest>
                                        <mainClass>com.example.App</mainClass>
                                    </manifest>
                                </archive>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.1.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.example.App</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    Refer to the relevant project pages in the Maven repository for up-to-date versions of the dependencies:

    • kafka-clients
    • jackson-databind
    • slf4j-simple
  4. Copy pom.xml to the producer and consumer application directories:

    cp pom.xml producer/pom.xml && cp pom.xml consumer/pom.xml
    
Connecting without SSL
Connecting via SSL
  1. Code example for delivering messages to a topic:

    producer/src/java/com/example/App.java

    package com.example;
    
    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.clients.producer.*;
    
    public class App {
    
      public static void main(String[] args) {
    
        int MSG_COUNT = 5;
    
        String HOST = "<broker_FQDN>:9092";
        String TOPIC = "<topic_name>";
        String USER = "<producer_name>";
        String PASS = "<producer_password>";
    
        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, USER, PASS);
        String KEY = "key";
    
        String serializer = StringSerializer.class.getName();
        Properties props = new Properties();
        props.put("bootstrap.servers", HOST);
        props.put("acks", "all");
        props.put("key.serializer", serializer);
        props.put("value.serializer", serializer);
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("sasl.jaas.config", jaasCfg);
    
        Producer<String, String> producer = new KafkaProducer<>(props);
    
        try {
         for (int i = 1; i <= MSG_COUNT; i++){
           producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get();
           System.out.println("Test message " + i);
          }
         producer.flush();
         producer.close();
        } catch (Exception ex) {
            System.out.println(ex);
            producer.close();
        }
      }
    }
    
  2. Code example for getting messages from a topic:

    consumer/src/java/com/example/App.java

    package com.example;
    
    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.clients.consumer.*;
    
    public class App {
    
      public static void main(String[] args) {
    
        String HOST = "<broker_FQDN>:9092";
        String TOPIC = "<topic_name>";
        String USER = "<consumer_name>";
        String PASS = "<consumer_password>";
    
        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, USER, PASS);
        String GROUP = "demo";
    
        String deserializer = StringDeserializer.class.getName();
        Properties props = new Properties();
        props.put("bootstrap.servers", HOST);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("group.id", GROUP);
        props.put("key.deserializer", deserializer);
        props.put("value.deserializer", deserializer);
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("sasl.jaas.config", jaasCfg);
    
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
    
        while(true) {
          ConsumerRecords<String, String> records = consumer.poll(100);
          for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.key() + ":" + record.value());
          }
        }
      }
    }
    
  3. Building applications:

    cd ~/project/producer && mvn clean package && \
    cd ~/project/consumer && mvn clean package
    
  4. Running applications:

    java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
    
    java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
    
  1. Go to the folder where the Java certificate store will be located:

    cd /etc/security
    
  2. Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set a password of at least 6 characters using the -storepass parameter for additional storage protection:

    sudo keytool -importcert \
                 -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
                 -keystore ssl -storepass <certificate_store_password> \
                 --noprompt
    
  3. Code example for delivering messages to a topic:

    producer/src/java/com/example/App.java

    package com.example;
    
    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.clients.producer.*;
    
    public class App {
    
      public static void main(String[] args) {
    
        int MSG_COUNT = 5;
    
        String HOST = "<broker_FQDN>:9091";
        String TOPIC = "<topic_name>";
        String USER = "<producer_name>";
        String PASS = "<producer_password>";
        String TS_FILE = "/etc/security/ssl";
        String TS_PASS = "<certificate_storage_password>";
    
        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, USER, PASS);
        String KEY = "key";
    
        String serializer = StringSerializer.class.getName();
        Properties props = new Properties();
        props.put("bootstrap.servers", HOST);
        props.put("acks", "all");
        props.put("key.serializer", serializer);
        props.put("value.serializer", serializer);
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("sasl.jaas.config", jaasCfg);
        props.put("ssl.truststore.location", TS_FILE);
        props.put("ssl.truststore.password", TS_PASS);
    
        Producer<String, String> producer = new KafkaProducer<>(props);
    
        try {
         for (int i = 1; i <= MSG_COUNT; i++){
           producer.send(new ProducerRecord<String, String>(TOPIC, KEY, "test message")).get();
           System.out.println("Test message " + i);
          }
         producer.flush();
         producer.close();
        } catch (Exception ex) {
            System.out.println(ex);
            producer.close();
        }
      }
    }
    
  4. Code example for getting messages from a topic:

    consumer/src/java/com/example/App.java

    package com.example;
    
    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.clients.consumer.*;
    
    public class App {
    
      public static void main(String[] args) {
    
        String HOST = "<broker_FQDN>:9091";
        String TOPIC = "<topic_name>";
        String USER = "<consumer_name>";
        String PASS = "<consumer_password>";
        String TS_FILE = "/etc/security/ssl";
        String TS_PASS = "<certificate_storage_password>";
    
        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, USER, PASS);
        String GROUP = "demo";
    
        String deserializer = StringDeserializer.class.getName();
        Properties props = new Properties();
        props.put("bootstrap.servers", HOST);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("group.id", GROUP);
        props.put("key.deserializer", deserializer);
        props.put("value.deserializer", deserializer);
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("sasl.jaas.config", jaasCfg);
        props.put("ssl.truststore.location", TS_FILE);
        props.put("ssl.truststore.password", TS_PASS);
    
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
    
        while(true) {
          ConsumerRecords<String, String> records = consumer.poll(100);
          for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.key() + ":" + record.value());
          }
        }
      }
    }
    
  5. Building applications:

    cd ~/project/producer && mvn clean package && \
    cd ~/project/consumer && mvn clean package
    
  6. Running applications:

    java -jar ~/project/producer/target/app-0.1.0-jar-with-dependencies.jar
    
    java -jar ~/project/consumer/target/app-0.1.0-jar-with-dependencies.jar
    

For info on how to get a broker host's FQDN, see this guide.

First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message messages to the topic. The consumer application displays messages sent to the topic.

Node.jsNode.js

Before connecting, install the following dependencies:

sudo apt update && sudo apt install -y nodejs npm && \
npm install node-rdkafka
Connecting without SSL
Connecting via SSL
  1. Code example for delivering messages to a topic:

    producer.js

    "use strict"
    const Kafka = require('node-rdkafka');
    
    const MSG_COUNT = 5;
    
    const HOST = "<broker_FQDN>:9092";
    const TOPIC = "<topic_name>";
    const USER = "<producer_name>";
    const PASS = "<producer_password>";
    
    const producer = new Kafka.Producer({
      'bootstrap.servers': HOST,
      'sasl.username': USER,
      'sasl.password': PASS,
      'security.protocol': "SASL_PLAINTEXT",
      'sasl.mechanism': "SCRAM-SHA-512"
    });
    
    producer.connect();
    
    producer.on('ready', function() {
      try {
        for (let i = 0; i < MSG_COUNT; ++i) {
          producer.produce(TOPIC, -1, Buffer.from("test message"), "key");
          console.log("Produced: test message");
        }
    
        producer.flush(10000, () => {
            producer.disconnect();
          });
      } catch (err) {
        console.error('Error');
        console.error(err);
      }
    });
    
  2. Code example for getting messages from a topic:

    consumer.js

    "use strict"
    const Kafka = require('node-rdkafka');
    
    const MSG_COUNT = 5;
    
    const HOST = "<broker_FQDN>:9092";
    const TOPIC = "<topic_name>";
    const USER = "<consumer_name>";
    const PASS = "<consumer_password>";
    
    const consumer = new Kafka.Consumer({
      'bootstrap.servers': HOST,
      'sasl.username': USER,
      'sasl.password': PASS,
      'security.protocol': "SASL_PLAINTEXT",
      'sasl.mechanism': "SCRAM-SHA-512",
      'group.id': "demo"
    });
    
    consumer.connect();
    
    consumer
      .on('ready', function() {
        consumer.subscribe([TOPIC]);
        consumer.consume();
      })
      .on('data', function(data) {
        console.log(data.key + ":" + data.value.toString());
      });
    
    process.on('SIGINT', () => {
      console.log('\nDisconnecting consumer ...');
      consumer.disconnect();
    });
    
  3. Running applications:

    node consumer.js
    
    node producer.js
    
  1. Code example for delivering messages to a topic:

    producer.js

    "use strict"
    const Kafka = require('node-rdkafka');
    
    const MSG_COUNT = 5;
    
    const HOST = "<broker_FQDN>:9091";
    const TOPIC = "<topic_name>";
    const USER = "<producer_name>";
    const PASS = "<producer_password>";
    const CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt";
    
    const producer = new Kafka.Producer({
      'bootstrap.servers': HOST,
      'sasl.username': USER,
      'sasl.password': PASS,
      'security.protocol': "SASL_SSL",
      'ssl.ca.location': CA_FILE,
      'sasl.mechanism': "SCRAM-SHA-512"
    });
    
    producer.connect();
    
    producer.on('ready', function() {
      try {
        for (let i = 0; i < MSG_COUNT; ++i) {
          producer.produce(TOPIC, -1, Buffer.from("test message"), "key");
          console.log("Produced: test message");
        }
    
        producer.flush(10000, () => {
            producer.disconnect();
          });
      } catch (err) {
        console.error('Error');
        console.error(err);
      }
    });
    
  2. Code example for getting messages from a topic:

    consumer.js

    "use strict"
    const Kafka = require('node-rdkafka');
    
    const MSG_COUNT = 5;
    
    const HOST = "<broker_FQDN>:9091";
    const TOPIC = "<topic_name>";
    const USER = "<consumer_name>";
    const PASS = "<consumer_password>";
    const CA_FILE = "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt";
    
    const consumer = new Kafka.Consumer({
      'bootstrap.servers': HOST,
      'sasl.username': USER,
      'sasl.password': PASS,
      'security.protocol': "SASL_SSL",
      'ssl.ca.location': CA_FILE,
      'sasl.mechanism': "SCRAM-SHA-512",
      'group.id': "demo"
    });
    
    consumer.connect();
    
    consumer
      .on('ready', function() {
        consumer.subscribe([TOPIC]);
        consumer.consume();
      })
      .on('data', function(data) {
        console.log(data.key + ":" + data.value.toString());
      });
    
    process.on('SIGINT', () => {
      console.log('\nDisconnecting consumer ...');
      consumer.disconnect();
    });
    
  3. Running applications:

    node consumer.js
    
    node producer.js
    

For info on how to get a broker host's FQDN, see this guide.

First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message messages to the topic. The consumer application displays messages sent to the topic.

Python (kafka-python)Python (kafka-python)

Before connecting, install the following dependencies:

sudo apt update && sudo apt install -y python3 python3-pip libsnappy-dev && \
pip3 install kafka-python lz4 python-snappy crc32c
Connecting without SSL
Connecting via SSL
  1. Code example for delivering a message to a topic:

    producer.py

    from kafka import KafkaProducer
    
    producer = KafkaProducer(
        bootstrap_servers='<broker_host_FQDN>:9092',
        security_protocol="SASL_PLAINTEXT",
        sasl_mechanism="SCRAM-SHA-512",
        sasl_plain_username='<producer_name>',
        sasl_plain_password='<producer_password>')
    
    producer.send('<topic_name>', b'test message', b'key')
    producer.flush()
    producer.close()
    
  2. Code example for getting messages from a topic:

    consumer.py

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        '<topic_name>',
        bootstrap_servers='<broker_FQDN>:9092',
        security_protocol="SASL_PLAINTEXT",
        sasl_mechanism="SCRAM-SHA-512",
        sasl_plain_username='<consumer_name>',
        sasl_plain_password='<consumer_password>')
    
    print("ready")
    
    for msg in consumer:
        print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))
    
  3. Running applications:

    python3 producer.py
    
    python3 consumer.py
    
  1. Code example for delivering a message to a topic:

    producer.py

    from kafka import KafkaProducer
    
    producer = KafkaProducer(
        bootstrap_servers='<broker_host_FQDN>:9091',
        security_protocol="SASL_SSL",
        sasl_mechanism="SCRAM-SHA-512",
        sasl_plain_username='<producer_name>',
        sasl_plain_password='<producer_password>',
        ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt")
    
    producer.send('<topic_name>', b'test message', b'key')
    producer.flush()
    producer.close()
    
  2. Code example for getting messages from a topic:

    consumer.py

    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        '<topic_name>',
        bootstrap_servers='<broker_FQDN>:9091',
        security_protocol="SASL_SSL",
        sasl_mechanism="SCRAM-SHA-512",
        sasl_plain_username='<consumer_name>',
        sasl_plain_password='<consumer_password>',
        ssl_cafile="/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt")
    
    print("ready")
    
    for msg in consumer:
        print(msg.key.decode("utf-8") + ":" + msg.value.decode("utf-8"))
    
  3. Running applications:

    python3 consumer.py
    
    python3 producer.py
    

For info on how to get a broker host's FQDN, see this guide.

First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message messages to the topic. The consumer application displays messages sent to the topic.

Python (confluent-kafka)Python (confluent-kafka)

Before connecting, install the following dependencies:

pip install confluent_kafka
Connecting without SSL
Connecting via SSL
  1. Code example for delivering a message to a topic:

    producer.py

    from confluent_kafka import Producer
    
    def error_callback(err):
        print('Something went wrong: {}'.format(err))
    
    params = {
        'bootstrap.servers': '<broker_host_FQDN>:9092',
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.mechanism': 'SCRAM-SHA-512',
        'sasl.username': '<producer_name>',
        'sasl.password': '<producer_password>',
        'error_cb': error_callback,
    }
    
    p = Producer(params)
    p.produce('<topic_name>', 'some payload1')
    p.flush(10)
    
  2. Code example for getting messages from a topic:

    consumer.py

    from confluent_kafka import Consumer
    
    def error_callback(err):
        print('Something went wrong: {}'.format(err))
    
    params = {
        'bootstrap.servers': '<broker_host_FQDN>:9092',
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.mechanism': 'SCRAM-SHA-512',
        'sasl.username': '<consumer_name>',
        'sasl.password': '<consumer_password>',
        'group.id': 'test-consumer1',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
        'error_cb': error_callback,
        'debug': 'all',
    }
    c = Consumer(params)
    c.subscribe(['<topic_name>'])
    while True:
        msg = c.poll(timeout=3.0)
        if msg:
            val = msg.value().decode()
            print(val)
    
  3. Running applications:

    python3 producer.py
    
    python3 consumer.py
    
  1. Code example for delivering a message to a topic:

    producer.py

    from confluent_kafka import Producer
    
    def error_callback(err):
        print('Something went wrong: {}'.format(err))
    
    params = {
        'bootstrap.servers': '<broker_host_FQDN>:9091',
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt',
        'sasl.mechanism': 'SCRAM-SHA-512',
        'sasl.username': '<producer_name>',
        'sasl.password': '<producer_password>',
        'error_cb': error_callback,
    }
    
    p = Producer(params)
    p.produce('<topic_name>', 'some payload1')
    p.flush(10)
    
  2. Code example for getting messages from a topic:

    consumer.py

    from confluent_kafka import Consumer
    
    def error_callback(err):
        print('Something went wrong: {}'.format(err))
    
    params = {
        'bootstrap.servers': '<broker_host_FQDN>:9091',
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt',
        'sasl.mechanism': 'SCRAM-SHA-512',
        'sasl.username': '<consumer_name>',
        'sasl.password': '<consumer_password>',
        'group.id': 'test-consumer1',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
        'error_cb': error_callback,
        'debug': 'all',
    }
    c = Consumer(params)
    c.subscribe(['<topic_name>'])
    while True:
        msg = c.poll(timeout=3.0)
        if msg:
            val = msg.value().decode()
            print(val)
    
  3. Running applications:

    python3 consumer.py
    
    python3 producer.py
    

For info on how to get a broker host's FQDN, see this guide.

First, launch the consumer application that will continuously read new messages from the topic. Then launch the producer application that will send one or more key:test message messages to the topic. The consumer application displays messages sent to the topic.

Was the article helpful?

Previous
Connecting from applications
Next
Managing topics
Yandex project
© 2025 Yandex.Cloud LLC