Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Managed Service for Apache Kafka®
  • Начало работы
    • Все руководства
    • Развертывание веб-интерфейса Apache Kafka®
      • Поставка из PostgreSQL с помощью Debezium
      • Поставка из MySQL® с помощью Debezium
      • Поставка в ClickHouse®
      • Поставка в ksqlDB
    • Работа с топиками Apache Kafka® с помощью Yandex Data Processing
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Yandex Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • История изменений
  • Вопросы и ответы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Настройте интеграцию с Apache Kafka® для базы ksqlDB
  • Изучите формат данных, поступающих от Managed Service for Apache Kafka®
  • Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®
  • Получите тестовые данные из кластера Managed Service for Apache Kafka®
  • Запишите тестовые данные в ksqlDB
  • Проверьте наличие записей в топиках Apache Kafka®
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Поставка данных другими способами
  3. Поставка в ksqlDB

Поставка данных в ksqlDB

Статья создана
Yandex Cloud
Обновлена 8 апреля 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Настройте интеграцию с Apache Kafka® для базы ksqlDB
  • Изучите формат данных, поступающих от Managed Service for Apache Kafka®
  • Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®
  • Получите тестовые данные из кластера Managed Service for Apache Kafka®
  • Запишите тестовые данные в ksqlDB
  • Проверьте наличие записей в топиках Apache Kafka®
  • Удалите созданные ресурсы

ksqlDB — это база данных, которая предназначена для потоковой обработки сообщений, поступающих из топиков Apache Kafka®. Работа с потоком сообщений в ksqlDB похожа на работу с таблицами в обычной базе данных. Таблица ksqlDB автоматически пополняется данными, поступающими из топика, а данные, которые вы добавите в таблицу ksqlDB, отправляются в топик Apache Kafka®. Подробнее см. в документации ksqlDB.

Чтобы настроить поставку данных из Managed Service for Apache Kafka® в ksqlDB:

  1. Настройте интеграцию с Apache Kafka® для базы ksqlDB.
  2. Изучите формат данных, поступающих от Managed Service for Apache Kafka®.
  3. Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®.
  4. Получите тестовые данные из кластера Managed Service for Apache Kafka®
  5. Запишите тестовые данные в ksqlDB.
  6. Проверьте наличие тестовых данных в топике Apache Kafka®.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за использование публичных IP-адресов, если для хостов кластера включен публичный доступ (см. тарифы Virtual Private Cloud).

Перед началом работыПеред началом работы

  1. Создайте кластер Managed Service for Apache Kafka® любой подходящей вам конфигурации:

    • Если сервер ksqlDB размещен в интернете, создайте кластер Managed Service for Apache Kafka® с публичным доступом.

    • Если сервер ksqlDB размещен в Yandex Cloud, создайте кластер Managed Service for Apache Kafka® в той же облачной сети, где находится ksqlDB.

  2. Создайте топики в кластере Managed Service for Apache Kafka®:

    1. Служебный топик _confluent-ksql-default__command_topic с настройками:
      • Фактор репликации — 1.
      • Количество разделов — 1.
      • Политика очистки лога — Delete.
      • Время жизни сегмента лога, мс — -1.
      • Минимальное число синхронных реплик — 1.
    2. Служебный топик default_ksql_processing_log для записи логов ksqlDB. Настройки топика могут быть любыми.
    3. Топик для хранения данных locations. Настройки топика могут быть любыми.
  3. Создайте пользователя с именем ksql и назначьте ему роль ACCESS_ROLE_ADMIN для всех топиков.

  4. Убедитесь, что вы можете подключиться к серверу ksqlDB.

  5. Установите утилиту kafkacat на сервер ksqlDB и убедитесь, что можете с ее помощью подключиться к кластеру Managed Service for Apache Kafka® через SSL.

  6. Установите утилиту для потоковой обработки JSON-файлов jq на сервер ksqlDB.

Настройте интеграцию с Apache Kafka® для базы ksqlDBНастройте интеграцию с Apache Kafka® для базы ksqlDB

  1. Подключитесь к серверу ksqlDB.

  2. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы ksqlDB мог использовать этот сертификат при защищенном подключении к хостам кластера. При этом задайте пароль в параметре -storepass для дополнительной защиты хранилища:

    cd /etc/ksqldb && \
    sudo keytool -importcert -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
    -keystore ssl -storepass <пароль_хранилища_сертификатов> \
    --noprompt
    
  3. Укажите в файле конфигурации ksqlDB /etc/ksqldb/ksql-server.properties данные для аутентификации в кластере Managed Service for Apache Kafka®:

    bootstrap.servers=<FQDN_брокера_1>:9091,...,<FQDN_брокера_N>:9091
    sasl.mechanism=SCRAM-SHA-512
    security.protocol=SASL_SSL
    ssl.truststore.location=/etc/ksqldb/ssl
    ssl.truststore.password=<пароль_хранилища_сертификатов>
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ksql" password="<пароль_пользователя_ksql>";
    

    Как получить FQDN хоста-брокера, см. в инструкции.

    Имя кластера можно запросить со списком кластеров в каталоге.

  4. Укажите в файле параметров логирования ksqlDB /etc/ksqldb/log4j.properties настройки записи логов в топик кластера Managed Service for Apache Kafka®:

    log4j.appender.kafka_appender=org.apache.kafka.log4jappender.KafkaLog4jAppender
    log4j.appender.kafka_appender.layout=io.confluent.common.logging.log4j.StructuredJsonLayout
    log4j.appender.kafka_appender.BrokerList=<FQDN_брокера_1>:9091,...,<FQDN_брокера_N>:9091
    log4j.appender.kafka_appender.Topic=default_ksql_processing_log
    log4j.logger.io.confluent.ksql=INFO,kafka_appender
    
    log4j.appender.kafka_appender.clientJaasConf=org.apache.kafka.common.security.scram.ScramLoginModule required username="ksql" password="<пароль_пользователя_ksql>";
    log4j.appender.kafka_appender.SecurityProtocol=SASL_SSL
    log4j.appender.kafka_appender.SaslMechanism=SCRAM-SHA-512
    log4j.appender.kafka_appender.SslTruststoreLocation=/etc/ksqldb/ssl
    log4j.appender.kafka_appender.SslTruststorePassword=<пароль_хранилища_сертификатов>
    
  5. Перезапустите сервис ksqlDB командой:

    sudo systemctl restart confluent-ksqldb.service
    

Изучите формат данных, поступающих от Managed Service for Apache Kafka®Изучите формат данных, поступающих от Managed Service for Apache Kafka®

Обработка потока данных из Managed Service for Apache Kafka® зависит от формата представления в сообщении Apache Kafka®.

В примере в топик Apache Kafka® locations будут записываться геоданные в формате JSON:

  • идентификатор profileId;
  • широта latitude;
  • долгота longitude;

Эти данные будут передаваться в виде сообщений Apache Kafka®. Каждое такое сообщение будет содержать JSON-объект как строку следующего вида:

{"profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205}

База ksqlDB будет использовать таблицу из трех столбцов, в которой хранятся значения соответствующих параметров из сообщений Apache Kafka®.

Далее выполним настройку полей потоковой таблицы в базе ksqlDB.

Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®

Чтобы записывать информацию из топика Apache Kafka®, создайте в базе ksqlDB таблицу. Структура таблицы соответствует формату данных, которые поступают из Managed Service for Apache Kafka®:

  1. Подключитесь к серверу ksqlDB.

  2. Запустите клиент ksql командой:

    ksql http://0.0.0.0:8088
    
  3. Выполните запрос:

    CREATE STREAM riderLocations 
    (
      profileId VARCHAR,
      latitude DOUBLE,
      longitude DOUBLE
    ) WITH 
    (
      kafka_topic='locations', 
      value_format='json', 
      partitions=<количество_разделов_топика_"locations">
    );
    

    Эта потоковая таблица будет автоматически наполняться сообщениями из топика locations кластера Managed Service for Apache Kafka®. Для чтения сообщений ksqlDB использует настройки пользователя ksql.

    Подробнее о создании потоковой таблицы на движке ksqlDB см. в документации ksqlDB.

  4. Выполните запрос:

    SELECT * FROM riderLocations WHERE 
             GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 
             EMIT CHANGES;
    

    Запрос ожидает появления данных в таблице в реальном времени.

Получите тестовые данные из кластера Managed Service for Apache Kafka®Получите тестовые данные из кластера Managed Service for Apache Kafka®

  1. Подключитесь к серверу ksqlDB.

  2. Создайте файл sample.json со следующими тестовыми данными:

    {
      "profileId": "c2309eec", 
      "latitude": 37.7877,
      "longitude": -122.4205
    }
    
    {
      "profileId": "4ab5cbad", 
      "latitude": 37.3952,
      "longitude": -122.0813
    }
    
    {
      "profileId": "4a7c7b41", 
      "latitude": 37.4049,
      "longitude": -122.0822
    }   
    
  3. Отправьте файл sample.json в топик locations кластера Managed Service for Apache Kafka® с помощью jq и kafkacat:

    jq -rc . sample.json | kafkacat -P \
       -b <FQDN_брокера_1>:9091,...,<FQDN_брокера_N>:9091> \
       -t locations \
       -X security.protocol=SASL_SSL \
       -X sasl.mechanisms=SCRAM-SHA-512 \
       -X sasl.username=ksql \
       -X sasl.password="<пароль_пользователя_ksql>" \
       -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
    

    Информация отправляется с помощью пользователя ksql. Подробнее о настройке SSL-сертификата и работе с kafkacat см. в разделе Подключение к кластеру Apache Kafka® из приложений.

  4. Убедитесь, что в сессии отобразились данные, которые были отправлены в топик:

    +--------------------------+--------------------------+------------------------+
    |PROFILEID                 |LATITUDE                  |LONGITUDE               |
    +--------------------------+--------------------------+------------------------+
    |4ab5cbad                  |37.3952                   |-122.0813               | 
    |4a7c7b41                  |37.4049                   |-122.0822               |
    

Данные считываются с помощью пользователя ksql.

Запишите тестовые данные в ksqlDBЗапишите тестовые данные в ksqlDB

  1. Подключитесь к серверу ksqlDB.

  2. Запустите клиент ksql командой:

    ksql http://0.0.0.0:8088
    
  3. Вставьте тестовые данные в таблицу riderLocations:

    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
    

    Эти данные синхронно отправляются в топик Apache Kafka® locations с помощью пользователя ksql.

Проверьте наличие записей в топиках Apache Kafka®Проверьте наличие записей в топиках Apache Kafka®

  1. Проверьте сообщения в топике locations кластера Managed Service for Apache Kafka® с помощью kafkacat и пользователя ksql:

    kafkacat -C \
     -b <FQDN_брокера_1>:9091,...,<FQDN_брокера_N>:9091 \
     -t locations \
     -X security.protocol=SASL_SSL \
     -X sasl.mechanisms=SCRAM-SHA-512 \
     -X sasl.username=ksql \
     -X sasl.password="<пароль_пользователя_ksql>" \
     -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
    
  2. Убедитесь, что в консоли отображаются сообщения, которые вы записали в таблицу.

  3. Проверьте сообщения в топике default_ksql_processing_log кластера Managed Service for Apache Kafka® с помощью kafkacat и пользователя ksql:

    kafkacat -C \
     -b <FQDN_брокера_1>:9091,...,<FQDN_брокера_N>:9091 \
     -t default_ksql_processing_log \
     -X security.protocol=SASL_SSL \
     -X sasl.mechanisms=SCRAM-SHA-512 \
     -X sasl.username=ksql \
     -X sasl.password="<пароль_пользователя_ksql>" \
     -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
    
  4. Убедитесь, что в консоли отображаются записи лога ksqlDB.

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:

  • Удалите виртуальную машину.
  • Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, удалите его.
  • Удалите кластер Managed Service for Apache Kafka®.

Была ли статья полезна?

Предыдущая
Поставка в ClickHouse®
Следующая
Работа с топиками Apache Kafka® с помощью Yandex Data Processing
Проект Яндекса
© 2025 ООО «Яндекс.Облако»