Поставка данных в ksqlDB
- Перед началом работы
- Настройте интеграцию с Apache Kafka® для базы ksqlDB
- Изучите формат данных, поступающих от Managed Service for Apache Kafka®
- Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®
- Получите тестовые данные из кластера Managed Service for Apache Kafka®
- Запишите тестовые данные в ksqlDB
- Проверьте наличие записей в топиках Apache Kafka®
- Удалите созданные ресурсы
ksqlDB — это база данных, которая предназначена для потоковой обработки сообщений, поступающих из топиков Apache Kafka®. Работа с потоком сообщений в ksqlDB похожа на работу с таблицами в обычной базе данных. Таблица ksqlDB автоматически пополняется данными, поступающими из топика, а данные, которые вы добавите в таблицу ksqlDB, отправляются в топик Apache Kafka®. Подробнее см. в документации ksqlDB
Чтобы настроить поставку данных из Managed Service for Apache Kafka® в ksqlDB:
- Настройте интеграцию с Apache Kafka® для базы ksqlDB.
- Изучите формат данных, поступающих от Managed Service for Apache Kafka®.
- Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®.
- Получите тестовые данные из кластера Managed Service for Apache Kafka®
- Запишите тестовые данные в ksqlDB.
- Проверьте наличие тестовых данных в топике Apache Kafka®.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
-
Создайте кластер Managed Service for Apache Kafka® любой подходящей вам конфигурации:
-
Если сервер ksqlDB размещен в интернете, создайте кластер Managed Service for Apache Kafka® с публичным доступом.
-
Если сервер ksqlDB размещен в Yandex Cloud, создайте кластер Managed Service for Apache Kafka® в той же облачной сети, где находится ksqlDB.
-
-
Создайте топики в кластере Managed Service for Apache Kafka®:
- Служебный топик
_confluent-ksql-default__command_topic
с настройками:- Фактор репликации —
1
. - Количество разделов —
1
. - Политика очистки лога —
Delete
. - Время жизни сегмента лога, мс —
-1
. - Минимальное число синхронных реплик —
1
.
- Фактор репликации —
- Служебный топик
default_ksql_processing_log
для записи логов ksqlDB. Настройки топика могут быть любыми. - Топик для хранения данных
locations
. Настройки топика могут быть любыми.
- Служебный топик
-
Создайте пользователя с именем
ksql
и назначьте ему рольACCESS_ROLE_ADMIN
для всех топиков. -
Убедитесь, что вы можете подключиться к серверу ksqlDB.
-
Установите утилиту
kafkacat
на сервер ksqlDB и убедитесь, что можете с ее помощью подключиться к кластеру Managed Service for Apache Kafka® через SSL. -
Установите утилиту для потоковой обработки JSON-файлов jq
на сервер ksqlDB.
Настройте интеграцию с Apache Kafka® для базы ksqlDB
-
Подключитесь к серверу ksqlDB.
-
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store), чтобы ksqlDB мог использовать этот сертификат при защищенном подключении к хостам кластера. При этом задайте пароль в параметре
-storepass
для дополнительной защиты хранилища:cd /etc/ksqldb && \ sudo keytool -importcert -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore ssl -storepass <пароль_хранилища_сертификатов> \ --noprompt
-
Укажите в файле конфигурации ksqlDB
/etc/ksqldb/ksql-server.properties
данные для аутентификации в кластере Managed Service for Apache Kafka®:bootstrap.servers=<FQDN_брокера_1>:9091,...,<FQDN_брокера_N>:9091 sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_SSL ssl.truststore.location=/etc/ksqldb/ssl ssl.truststore.password=<пароль_хранилища_сертификатов> sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ksql" password="<пароль_пользователя_ksql>";
Как получить FQDN хоста-брокера, см. в инструкции.
Имя кластера можно запросить со списком кластеров в каталоге.
-
Укажите в файле параметров логирования ksqlDB
/etc/ksqldb/log4j.properties
настройки записи логов в топик кластера Managed Service for Apache Kafka®:log4j.appender.kafka_appender=org.apache.kafka.log4jappender.KafkaLog4jAppender log4j.appender.kafka_appender.layout=io.confluent.common.logging.log4j.StructuredJsonLayout log4j.appender.kafka_appender.BrokerList=<FQDN_брокера_1>:9091,...,<FQDN_брокера_N>:9091 log4j.appender.kafka_appender.Topic=default_ksql_processing_log log4j.logger.io.confluent.ksql=INFO,kafka_appender log4j.appender.kafka_appender.clientJaasConf=org.apache.kafka.common.security.scram.ScramLoginModule required username="ksql" password="<пароль_пользователя_ksql>"; log4j.appender.kafka_appender.SecurityProtocol=SASL_SSL log4j.appender.kafka_appender.SaslMechanism=SCRAM-SHA-512 log4j.appender.kafka_appender.SslTruststoreLocation=/etc/ksqldb/ssl log4j.appender.kafka_appender.SslTruststorePassword=<пароль_хранилища_сертификатов>
-
Перезапустите сервис ksqlDB командой:
sudo systemctl restart confluent-ksqldb.service
Изучите формат данных, поступающих от Managed Service for Apache Kafka®
Обработка потока данных из Managed Service for Apache Kafka® зависит от формата представления в сообщении Apache Kafka®.
В примере в топик Apache Kafka® locations
будут записываться геоданные в формате JSON:
- идентификатор
profileId
; - широта
latitude
; - долгота
longitude
;
Эти данные будут передаваться в виде сообщений Apache Kafka®. Каждое такое сообщение будет содержать JSON-объект как строку следующего вида:
{"profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205}
База ksqlDB будет использовать таблицу из трех столбцов, в которой хранятся значения соответствующих параметров из сообщений Apache Kafka®.
Далее выполним настройку полей потоковой таблицы в базе ksqlDB.
Создайте в ksqlDB таблицу для записи потока данных из топика Apache Kafka®
Чтобы записывать информацию из топика Apache Kafka®, создайте в базе ksqlDB таблицу. Структура таблицы соответствует формату данных, которые поступают из Managed Service for Apache Kafka®:
-
Подключитесь к серверу ksqlDB.
-
Запустите клиент
ksql
командой:ksql http://0.0.0.0:8088
-
Выполните запрос:
CREATE STREAM riderLocations ( profileId VARCHAR, latitude DOUBLE, longitude DOUBLE ) WITH ( kafka_topic='locations', value_format='json', partitions=<количество_разделов_топика_"locations"> );
Эта потоковая таблица будет автоматически наполняться сообщениями из топика
locations
кластера Managed Service for Apache Kafka®. Для чтения сообщений ksqlDB использует настройки пользователяksql
.Подробнее о создании потоковой таблицы на движке ksqlDB см. в документации ksqlDB
. -
Выполните запрос:
SELECT * FROM riderLocations WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
Запрос ожидает появления данных в таблице в реальном времени.
Получите тестовые данные из кластера Managed Service for Apache Kafka®
-
Подключитесь к серверу ksqlDB.
-
Создайте файл
sample.json
со следующими тестовыми данными:{ "profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205 } { "profileId": "4ab5cbad", "latitude": 37.3952, "longitude": -122.0813 } { "profileId": "4a7c7b41", "latitude": 37.4049, "longitude": -122.0822 }
-
Отправьте файл
sample.json
в топикlocations
кластера Managed Service for Apache Kafka® с помощьюjq
иkafkacat
:jq -rc . sample.json | kafkacat -P \ -b <FQDN_брокера_1>:9091,...,<FQDN_брокера_N>:9091> \ -t locations \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=ksql \ -X sasl.password="<пароль_пользователя_ksql>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
Информация отправляется с помощью пользователя
ksql
. Подробнее о настройке SSL-сертификата и работе сkafkacat
см. в разделе Подключение к кластеру Apache Kafka® из приложений. -
Убедитесь, что в сессии отобразились данные, которые были отправлены в топик:
+--------------------------+--------------------------+------------------------+ |PROFILEID |LATITUDE |LONGITUDE | +--------------------------+--------------------------+------------------------+ |4ab5cbad |37.3952 |-122.0813 | |4a7c7b41 |37.4049 |-122.0822 |
Данные считываются с помощью пользователя ksql
.
Запишите тестовые данные в ksqlDB
-
Подключитесь к серверу ksqlDB.
-
Запустите клиент
ksql
командой:ksql http://0.0.0.0:8088
-
Вставьте тестовые данные в таблицу
riderLocations
:INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
Эти данные синхронно отправляются в топик Apache Kafka®
locations
с помощью пользователяksql
.
Проверьте наличие записей в топиках Apache Kafka®
-
Проверьте сообщения в топике
locations
кластера Managed Service for Apache Kafka® с помощьюkafkacat
и пользователяksql
:kafkacat -C \ -b <FQDN_брокера_1>:9091,...,<FQDN_брокера_N>:9091 \ -t locations \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=ksql \ -X sasl.password="<пароль_пользователя_ksql>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
-
Убедитесь, что в консоли отображаются сообщения, которые вы записали в таблицу.
-
Проверьте сообщения в топике
default_ksql_processing_log
кластера Managed Service for Apache Kafka® с помощьюkafkacat
и пользователяksql
:kafkacat -C \ -b <FQDN_брокера_1>:9091,...,<FQDN_брокера_N>:9091 \ -t default_ksql_processing_log \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=ksql \ -X sasl.password="<пароль_пользователя_ksql>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
-
Убедитесь, что в консоли отображаются записи лога ksqlDB.
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
- Удалите виртуальную машину.
- Если вы зарезервировали для виртуальной машины публичный статический IP-адрес, удалите его.
- Удалите кластер Managed Service for Apache Kafka®.