Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Managed Service for Apache Kafka®
  • Начало работы
    • Все руководства
    • Развертывание веб-интерфейса Apache Kafka®
      • Настройка Kafka Connect для работы с Managed Service for Apache Kafka®
      • Миграция базы данных из стороннего кластера Apache Kafka®
      • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Работа с топиками Apache Kafka® с помощью Yandex Data Processing
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Yandex Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • История изменений
  • Вопросы и ответы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Настройте виртуальную машину
  • Подготовьте тестовые данные
  • Настройте Kafka Connect
  • Запустите Kafka Connect и проверьте его работу
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Перемещение данных из Apache Kafka®
  3. Настройка Kafka Connect для работы с Managed Service for Apache Kafka®

Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®

Статья создана
Yandex Cloud
Обновлена 8 апреля 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Настройте виртуальную машину
  • Подготовьте тестовые данные
  • Настройте Kafka Connect
  • Запустите Kafka Connect и проверьте его работу
  • Удалите созданные ресурсы

Примечание

Managed Service for Apache Kafka® имеет встроенную поддержку некоторых коннекторов и позволяет управлять ими. Список доступных коннекторов приведен в разделе Коннекторы. Если вам нужны другие коннекторы, или вы хотите управлять работой Kafka Connect вручную, используйте информацию из этого руководства.

Инструмент Kafka Connect предназначен для перемещения данных между Apache Kafka® и другими хранилищами данных.

Работа с данными в Kafka Connect осуществляется с помощью процессов-исполнителей (workers). Инструмент может быть развернут как в виде распределенной системы (distributed mode) с несколькими процессами-исполнителями, так и в виде отдельной инсталляции из одного процесса-исполнителя (standalone mode).

Непосредственно перемещение данных выполняется с помощью коннекторов, которые запускаются в отдельных потоках процесса-исполнителя.

Подробнее о Kafka Connect см. в документации Apache Kafka®.

Далее будет продемонстрировано, как настроить Kafka Connect для взаимодействия с кластером Managed Service for Apache Kafka®. Инструмент будет развернут на виртуальной машине Yandex Cloud в виде отдельной инсталляции. Для защиты подключения будет использоваться SSL-шифрование.

Также будет настроен простой коннектор FileStreamSource, с помощью которого Kafka Connect прочитает данные из тестового JSON-файла и передаст их в топик кластера.

Примечание

Вы можете использовать любой другой коннектор Kafka Connect для взаимодействия с кластером Managed Service for Apache Kafka®.

Чтобы настроить Kafka Connect для работы с кластером Managed Service for Apache Kafka®:

  1. Настройте виртуальную машину.
  2. Подготовьте тестовые данные.
  3. Настройте Kafka Connect.
  4. Запустите Kafka Connect и проверьте его работу.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за использование публичных IP-адресов, если для хостов кластера включен публичный доступ (см. тарифы Virtual Private Cloud).
  • Плата за ВМ: использование вычислительных ресурсов, хранилища и публичного IP-адреса (см. тарифы Compute Cloud).

Перед началом работыПеред началом работы

Вручную
Terraform
  1. Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации.

  2. Создайте топик с именем messages для обмена сообщениями между Kafka Connect и кластером Managed Service for Apache Kafka®.

  3. Создайте пользователя с именем user и выдайте ему права на топик messages:

    • ACCESS_ROLE_CONSUMER,
    • ACCESS_ROLE_PRODUCER.
  4. В той же сети, что и кластер Managed Service for Apache Kafka®, создайте виртуальную машину с Ubuntu 20.04 и публичным IP-адресом.

  5. Если вы используете группы безопасности, настройте их так, чтобы был разрешен весь необходимый трафик между кластером Managed Service for Apache Kafka® и виртуальной машиной.

  1. Если у вас еще нет Terraform, установите его.

  2. Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.

  3. Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его.

  4. Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.

  5. Скачайте в ту же рабочую директорию файл конфигурации kafka-connect.tf.

    В этом файле описаны:

    • сеть;

    • подсеть;

    • группа безопасности по умолчанию и правила, необходимые для подключения к кластеру и виртуальной машине из интернета;

    • виртуальная машина с Ubuntu 20.04;

    • кластер Managed Service for Apache Kafka® с необходимыми настройками.

  6. Укажите в файле пароль для пользователя user, который будет использоваться для доступа к кластеру Managed Service for Apache Kafka®, а также имя пользователя и публичную часть SSH-ключа для виртуальной машины. Если на виртуальную машину будет установлена Ubuntu 20.04 из рекомендованного списка образов, то указанное здесь имя пользователя игнорируется. В таком случае при подключении используйте имя пользователя ubuntu.

  7. Проверьте корректность файлов конфигурации Terraform с помощью команды:

    terraform validate
    

    Если в файлах конфигурации есть ошибки, Terraform на них укажет.

  8. Создайте необходимую инфраструктуру:

    1. Выполните команду для просмотра планируемых изменений:

      terraform plan
      

      Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

    2. Если вас устраивают планируемые изменения, внесите их:

      1. Выполните команду:

        terraform apply
        
      2. Подтвердите изменение ресурсов.

      3. Дождитесь завершения операции.

    В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.

Настройте виртуальную машинуНастройте виртуальную машину

  1. Подключитесь к виртуальной машине по SSH.

  2. Установите JDK и утилиту kcat:

    sudo apt update && \
    sudo apt install default-jdk --yes && \
    sudo apt install kafkacat
    

    Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.

  3. Скачайте и распакуйте архив с Apache Kafka®:

    wget https://downloads.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz && tar -xvf kafka_2.12-3.1.0.tgz --strip 1 --directory /opt/kafka/
    

    В данном примере используется Apache Kafka® версии 3.1.0.

  4. Получите SSL-сертификат.

  5. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы драйвер Apache Kafka® мог использовать этот сертификат при защищенном подключении к хостам кластера. Задайте пароль не короче 6 символов в параметре -storepass для дополнительной защиты хранилища:

    sudo keytool -importcert \
                 -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
                 -keystore ssl -storepass <пароль_хранилища_сертификатов> \
                 --noprompt
    
  6. Создайте каталог с настройками процесса-исполнителя и скопируйте туда хранилище:

    sudo mkdir --parents /etc/kafka-connect-worker && \
    sudo cp ssl /etc/kafka-connect-worker/client.truststore.jks
    

Подготовьте тестовые данныеПодготовьте тестовые данные

Создайте файл /var/log/sample.json с тестовыми данными. В этом файле приведены данные от сенсоров нескольких автомобилей в формате JSON:

sample.json
{"device_id":"iv9a94th6rzt********","datetime":"2020-06-05 17:27:00","latitude":55.70329032,"longitude":37.65472196,"altitude":427.5,"speed":0,"battery_voltage":23.5,"cabin_temperature":17,"fuel_level":null}
{"device_id":"rhibbh3y08qm********","datetime":"2020-06-06 09:49:54","latitude":55.71294467,"longitude":37.66542005,"altitude":429.13,"speed":55.5,"battery_voltage":null,"cabin_temperature":18,"fuel_level":32}
{"device_id":"iv9a94th6rzt********","datetime":"2020-06-07 15:00:10","latitude":55.70985913,"longitude":37.62141918,"altitude":417,"speed":15.7,"battery_voltage":10.3,"cabin_temperature":17,"fuel_level":null}

Настройте Kafka ConnectНастройте Kafka Connect

  1. Создайте файл настроек процесса-исполнителя /etc/kafka-connect-worker/worker.properties:

    # AdminAPI connect properties
    bootstrap.servers=<FQDN_хоста-брокера>:9091
    sasl.mechanism=SCRAM-SHA-512
    security.protocol=SASL_SSL
    ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks
    ssl.truststore.password=<пароль_к_хранилищу_сертификата>
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<пароль_пользователя_user>";
    
    # Producer connect properties
    producer.sasl.mechanism=SCRAM-SHA-512
    producer.security.protocol=SASL_SSL
    producer.ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks
    producer.ssl.truststore.password=<пароль_к_хранилищу_сертификата>
    producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<пароль_пользователя_user>";
    
    # Worker properties
    plugin.path=/etc/kafka-connect-worker/plugins
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.file.filename=/etc/kafka-connect-worker/worker.offset
    

    Kafka Connect будет подключаться к кластеру Managed Service for Apache Kafka® от имени пользователя user, созданного ранее.

    FQDN хостов-брокеров можно запросить со списком хостов в кластере.

  2. Создайте файл настроек коннектора /etc/kafka-connect-worker/file-connector.properties:

    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    file=/var/log/sample.json
    topic=messages
    

    Где:

    • file — имя файла, из которого коннектор будет читать данные.
    • topic — имя топика в кластере Managed Service for Apache Kafka®, куда коннектор будет передавать данные.

Запустите Kafka Connect и проверьте его работуЗапустите Kafka Connect и проверьте его работу

  1. Чтобы отправить тестовые данные в кластер, запустите процесс-исполнитель на виртуальной машине:

    cd ~/opt/kafka/bin/ && \
    sudo ./connect-standalone.sh \
         /etc/kafka-connect-worker/worker.properties \
         /etc/kafka-connect-worker/file-connector.properties
    
  2. Подключитесь к кластеру с помощью kcat и получите данные из топика кластера:

    kafkacat -C \
        -b <FQDN_хоста-брокера>:9091 \
        -t messages \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=user \
        -X sasl.password="<пароль_учетной_записи_user>" \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
    

    FQDN хостов-брокеров можно запросить со списком хостов в кластере.

    В выводе команды вы увидите содержимое тестового файла /var/log/sample.json, переданное на предыдущем шаге.

Удалите созданные ресурсыУдалите созданные ресурсы

Удалите ресурсы, которые вы больше не будете использовать, чтобы за них не списывалась плата:

Вручную
Terraform
  1. Удалите виртуальную машину.
  2. Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, удалите его.
  3. Удалите кластер Managed Service for Apache Kafka®.
  1. В терминале перейдите в директорию с планом инфраструктуры.

    Важно

    Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.

  2. Удалите ресурсы:

    1. Выполните команду:

      terraform destroy
      
    2. Подтвердите удаление ресурсов и дождитесь завершения операции.

    Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.

Была ли статья полезна?

Предыдущая
Развертывание веб-интерфейса Apache Kafka®
Следующая
Миграция базы данных из стороннего кластера Apache Kafka®
Проект Яндекса
© 2025 ООО «Яндекс.Облако»