Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Managed Service for Apache Kafka®
  • Начало работы
    • Все инструкции
      • Предварительная настройка
      • Подключение из приложений
      • Примеры кода
    • Управление топиками
    • Управление пользователями
    • Управление коннекторами
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Yandex Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • История изменений
  • Вопросы и ответы
  • Обучающие курсы

В этой статье:

  • Инструменты командной строки
  • kafkacat
  • Инструменты Apache Kafka® для Linux (Bash)/macOS (Zsh)
  • Инструменты Apache Kafka® для Windows (PowerShell)
  • Подготовка к подключению из Docker-контейнера
  1. Пошаговые инструкции
  2. Подключение
  3. Подключение из приложений

Подключение к кластеру Apache Kafka® из приложений

Статья создана
Yandex Cloud
Обновлена 11 марта 2025 г.
  • Инструменты командной строки
    • kafkacat
    • Инструменты Apache Kafka® для Linux (Bash)/macOS (Zsh)
    • Инструменты Apache Kafka® для Windows (PowerShell)
  • Подготовка к подключению из Docker-контейнера

В этом разделе представлены настройки для подключения к хостам кластера Managed Service for Apache Kafka® с помощью инструментов командной строки и из Docker-контейнера. О подключении из кода вашего приложения см. Примеры кода.

Вы можете подключаться к хостам кластера Apache Kafka® в публичном доступе только с использованием SSL-сертификата. В примерах ниже предполагается, что сертификат YandexInternalRootCA.crt расположен в директории:

  • /usr/local/share/ca-certificates/Yandex/ для Ubuntu;
  • $HOME\.kafka\ для Windows.

Подключение без использования SSL-сертификата поддерживается только для хостов, находящихся не в публичном доступе. В этом случае трафик внутри виртуальной сети при подключении к БД шифроваться не будет.

При необходимости перед подключением настройте группы безопасности кластера.

Примеры для Linux проверялись в следующем окружении:

  • Виртуальная машина в Yandex Cloud с Ubuntu 20.04 LTS.
  • OpenJDK: 11.0.24.
  • Bash: 5.0.16.

Примеры для Windows проверялись в следующем окружении:

  • Виртуальная машина в Yandex Cloud с Windows Server 2019 Datacenter.
  • Microsoft OpenJDK: 11.0.11.
  • PowerShell: 5.1.17763.1490 Desktop.

Инструменты командной строкиИнструменты командной строки

Примеры кода с заполненным FQDN хоста доступны в консоли управления по нажатию кнопки Подключиться на странице кластера.

kafkacatkafkacat

Утилита kafkacat (второе название kcat) — приложение с открытым исходным кодом, которое может работать как универсальный производитель или потребитель данных и не требует установки Java Runtime Environment.

Перед подключением установите зависимости:

sudo apt update && sudo apt install -y kafkacat

Примечание

На Ubuntu 24.04 и выше используйте утилиту kcat.

Подключение без SSL
Подключение с SSL
  1. Запустите команду получения сообщений из топика:

    kafkacat -C \
             -b <FQDN_брокера>:9092 \
             -t <имя_топика> \
             -X security.protocol=SASL_PLAINTEXT \
             -X sasl.mechanism=SCRAM-SHA-512 \
             -X sasl.username="<логин_потребителя>" \
             -X sasl.password="<пароль_потребителя>" -Z
    

    Команда будет непрерывно считывать новые сообщения из топика.

  2. В отдельном терминале запустите команду отправки сообщения в топик:

    echo "test message" | kafkacat -P \
           -b <FQDN_брокера>:9092 \
           -t <имя_топика> \
           -k key \
           -X security.protocol=SASL_PLAINTEXT \
           -X sasl.mechanism=SCRAM-SHA-512 \
           -X sasl.username="<логин_производителя>" \
           -X sasl.password="<пароль_производителя>" -Z
    
  1. Запустите команду получения сообщений из топика:

    kafkacat -C \
             -b <FQDN_брокера>:9091 \
             -t <имя_топика> \
             -X security.protocol=SASL_SSL \
             -X sasl.mechanism=SCRAM-SHA-512 \
             -X sasl.username="<логин_потребителя>" \
             -X sasl.password="<пароль_потребителя>" \
             -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
    

    Команда будет непрерывно считывать новые сообщения из топика.

  2. В отдельном терминале запустите команду отправки сообщения в топик:

    echo "test message" | kafkacat -P \
        -b <FQDN_брокера>:9091 \
        -t <имя_топика> \
        -k key \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanism=SCRAM-SHA-512 \
        -X sasl.username="<логин_производителя>" \
        -X sasl.password="<пароль_производителя>" \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
    

Как получить FQDN хоста-брокера, см. в инструкции.

Убедитесь, что в первом терминале отобразилось сообщение key:test message, отправленное во втором.

Инструменты Apache Kafka® для Linux (Bash)/macOS (Zsh)Инструменты Apache Kafka® для Linux (Bash)/macOS (Zsh)

В состав архивов с бинарными файлами Apache Kafka® включен набор инструментов, который позволяет управлять кластером Apache Kafka® и сущностями в нем. Далее на примере будет показано, как указать реквизиты пользователя для подключения и использовать инструменты с этими реквизитами:

  • С помощью kafka-console-producer будет отправлено сообщение в топик.
  • С помощью kafka-console-consumer будет получено сообщение из топика.

Перед подключением:

  1. Установите OpenJDK:

    sudo apt update && sudo apt install --yes default-jdk
    
  2. Загрузите архив с бинарными файлами для версии Apache Kafka®, которая используется в кластере. Версия Scala неважна.

  3. Распакуйте архив.

Подключение без SSL
Подключение с SSL
  1. Создайте файлы с параметрами для подключения к кластеру: файл для производителя и файл для потребителя.

    Эти файлы имеют одинаковое содержимое, различаются только реквизиты пользователя:

    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="<логин_производителя_или_потребителя>" \
      password="<пароль_производителя_или_потребителя>";
    security.protocol=SASL_PLAINTEXT
    
  2. Запустите команду получения сообщений из топика:

    <путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-consumer.sh \
      --consumer.config <путь_к_файлу_с_параметрами_для_потребителя> \
      --bootstrap-server <FQDN_брокера>:9092 \
      --topic <имя_топика> \
      --property print.key=true \
      --property key.separator=":"
    

    Команда будет непрерывно считывать новые сообщения из топика.

  3. В отдельном терминале запустите команду отправки сообщения в топик:

    echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-producer.sh \
      --producer.config <путь_к_файлу_с_параметрами_для_производителя> \
      --bootstrap-server <FQDN_брокера>:9092 \
      --topic <имя_топика> \
      --property parse.key=true \
      --property key.separator=":"
    
  1. Перейдите в каталог, где будет располагаться хранилище сертификатов Java:

    cd /etc/security
    
  2. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль не короче 6 символов в параметре -storepass для дополнительной защиты хранилища:

    sudo keytool -importcert \
                 -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
                 -keystore ssl -storepass <пароль_хранилища_сертификатов> \
                 --noprompt
    
  3. Создайте файлы с параметрами для подключения к кластеру: файл для производителя и файл для потребителя.

    Эти файлы имеют одинаковое содержимое, различаются только реквизиты пользователя:

    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="<логин_производителя_или_потребителя>" \
      password="<пароль_производителя_или_потребителя>";
    security.protocol=SASL_SSL
    ssl.truststore.location=/etc/security/ssl
    ssl.truststore.password=<пароль_хранилища_сертификатов>
    
  4. Запустите команду получения сообщений из топика:

    <путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-consumer.sh \
      --consumer.config <путь_к_файлу_с_параметрами_для_потребителя> \
      --bootstrap-server <FQDN_брокера>:9091 \
      --topic <имя_топика> \
      --property print.key=true \
      --property key.separator=":"
    

    Команда будет непрерывно считывать новые сообщения из топика.

  5. В отдельном терминале запустите команду отправки сообщения в топик:

    echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>/bin/kafka-console-producer.sh \
      --producer.config <путь_к_файлу_с_параметрами_для_производителя> \
      --bootstrap-server <FQDN_брокера>:9091 \
      --topic <имя_топика> \
      --property parse.key=true \
      --property key.separator=":"
    

Как получить FQDN хоста-брокера, см. в инструкции.

Убедитесь, что в первом терминале отобразилось сообщение key:test message, отправленное во втором.

Инструменты Apache Kafka® для Windows (PowerShell)Инструменты Apache Kafka® для Windows (PowerShell)

В состав архивов с бинарными файлами Apache Kafka® включен набор инструментов, который позволяет управлять кластером Apache Kafka® и сущностями в нем. Далее на примере будет показано, как указать реквизиты пользователя для подключения и использовать инструменты с этими реквизитами:

  • С помощью kafka-console-producer будет отправлено сообщение в топик.
  • С помощью kafka-console-consumer будет получено сообщение из топика.

Хотя документация по инструментам содержит упоминание скриптов .sh, она актуальна и при работе в Windows. Сами инструменты одинаковы для любой платформы, различаются лишь скрипты, которые запускают их, например:

  • bin/kafka-console-producer.sh для Linux (Bash)/macOS (Zsh).
  • bin\windows\kafka-console-producer.bat для Windows (PowerShell).

Перед подключением:

  1. Установите последнюю доступную версию Microsoft OpenJDK.

  2. Загрузите архив с бинарными файлами для версии Apache Kafka®, которая используется в кластере. Версия Scala неважна.

  3. Распакуйте архив.

    Совет

    Распаковывайте файлы Apache Kafka® в корневой каталог диска, например, C:\kafka_2.12-2.6.0\.

    Если путь к исполняемым и пакетным файлам Apache Kafka® будет слишком длинным, то при попытке запустить их возникнет ошибка The input line is too long.

Подключение без SSL
Подключение с SSL
  1. Создайте файлы с параметрами для подключения к кластеру: файл для производителя и файл для потребителя.

    Эти файлы имеют одинаковое содержимое, различаются только реквизиты пользователя:

    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="<логин_производителя_или_потребителя>" \
      password="<пароль_производителя_или_потребителя>";
    security.protocol=SASL_PLAINTEXT
    
  2. Запустите команду получения сообщений из топика:

    <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-consumer.bat `
        --consumer.config <путь_к_файлу_с_параметрами_для_потребителя> `
        --bootstrap-server <FQDN_брокера>:9092 `
        --topic <имя_топика> `
        --property print.key=true `
        --property key.separator=":"
    

    Команда будет непрерывно считывать новые сообщения из топика.

  3. В отдельном терминале запустите команду отправки сообщения в топик:

    echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-producer.bat `
        --producer.config <путь_к_файлу_с_параметрами_для_производителя> `
        --bootstrap-server <FQDN_брокера>:9092 `
        --topic <имя_топика> `
        --property parse.key=true `
        --property key.separator=":"
    
  1. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль в параметре --storepass для дополнительной защиты хранилища:

    keytool.exe -importcert -alias YandexCA `
      --file $HOME\.kafka\YandexInternalRootCA.crt `
      --keystore $HOME\.kafka\ssl `
      --storepass <пароль_хранилища_сертификатов> `
      --noprompt
    
  2. Создайте файлы с параметрами для подключения к кластеру: файл для производителя и файл для потребителя.

    Эти файлы имеют одинаковое содержимое, различаются только реквизиты пользователя:

    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="<логин_производителя_или_потребителя>" \
      password="<пароль_производителя_или_потребителя>";
    security.protocol=SASL_SSL
    ssl.truststore.location=<значение_переменной_$HOME>\\.kafka\\ssl
    ssl.truststore.password=<пароль_хранилища_сертификатов>
    

    В качестве значения параметра ssl.truststore.location укажите полный путь к хранилищу сертификатов, например:

    ssl.truststore.location=C:\\Users\\Administrator\\.kafka\\ssl
    

    Хранилище сертификатов расположено по пути $HOME\.kafka\ssl, но в значении нельзя использовать переменные среды окружения. Чтобы раскрыть переменную, выполните команду:

    echo $HOME
    

    Важно

    Используйте \\ вместо \ при указании значения параметра ssl.truststore.location, иначе при запуске команд не удастся получить доступ к хранилищу сертификатов.

  3. Запустите команду получения сообщений из топика:

    <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-consumer.bat `
        --consumer.config <путь_к_файлу_с_параметрами_для_потребителя> `
        --bootstrap-server <FQDN_брокера>:9091 `
        --topic <имя_топика> `
        --property print.key=true `
        --property key.separator=":"
    

    Команда будет непрерывно считывать новые сообщения из топика.

  4. В отдельном терминале запустите команду отправки сообщения в топик:

    echo "key:test message" | <путь_к_директории_с_файлами_Apache_Kafka>\bin\windows\kafka-console-producer.bat `
        --producer.config <путь_к_файлу_с_параметрами_для_производителя> `
        --bootstrap-server <FQDN_брокера>:9091 `
        --topic <имя_топика> `
        --property parse.key=true `
        --property key.separator=":"
    

Как получить FQDN хоста-брокера, см. в инструкции.

Убедитесь, что в первом терминале отобразилось сообщение key:test message, отправленное во втором.

Подготовка к подключению из Docker-контейнераПодготовка к подключению из Docker-контейнера

Чтобы подключаться к кластеру Managed Service for Apache Kafka® из Docker-контейнера, добавьте в Dockerfile строки:

Подключение без SSL
Подключение с SSL
RUN apt-get update && \
    apt-get install kafkacat --yes
RUN apt-get update && \
    apt-get install wget kafkacat --yes && \
    mkdir --parents /usr/local/share/ca-certificates/Yandex/ && \
    wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \
         --output-document /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt && \
    chmod 0655 /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt

Была ли статья полезна?

Предыдущая
Предварительная настройка
Следующая
Примеры кода
Проект Яндекса
© 2025 ООО «Яндекс.Облако»