Аутентификация и соединение с БД по Kafka API
Эндпоинт
Эндпоинт Kafka API отображается в консоли управления
Эндпоинт имеет следующий вид: <FQDN_YDB>:PORT. Например, ydb-01.serverless.yandexcloud.net:9093.
Предварительные требования
Для аутентификации требуется:
- Создать сервисный аккаунт.
- Назначить сервисному аккаунту роли:
- Для чтения из потока данных:
ydb.kafkaApi.clientиydb.viewer. - Для записи в поток данных:
ydb.kafkaApi.clientиydb.editor.
- Для чтения из потока данных:
- Создать API-ключ c областью действия
yc.ydb.topics.manage.
Аутентификация
В Kafka API аутентификация выполняется через механизм SASL_SSL/PLAIN
Для этого необходимы:
-
<database>— путь базы данных. Путь базы данных отображается в консоли управления , на странице потока данных, на вкладке Обзор, в поле Эндпоинт послеdatabase=.Например, если эндпоинт —
grpcs://ydb.serverless.yandexcloud.net:2135/?database=/ru-central1/b1gia87mbaom********/etnudu2n9ri3********, то путь базы данных —/ru-central1/b1gia87mbaom********/etnudu2n9ri3********. -
<api-key>— API-ключ.
Эти параметры будут использоваться для аутентификации при чтении и записи сообщений:
<sasl.username>=@<database>(обратите внимание, что перед путем к базе данных необходимо поставить символ@)<sasl.password>=<api-key>
Пример записи и чтения сообщения
В примере используются:
<kafka-api-endpoint>— эндпоинт.<stream-name>— имя потока данных.
-
Установите SSL-сертификат, если используете Dedicated базу:
sudo mkdir -p /usr/local/share/ca-certificates/Yandex/ && \ wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \ --output-document /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt && \ sudo chmod 0655 /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crtСертификат будет сохранен в файле
/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt. -
Установите
kcat— приложение с открытым исходным кодом, которое может работать как универсальный производитель или потребитель данных:sudo apt-get install kafkacat -
Запустите команду получения сообщений из потока:
Serverless базаDedicated базаkcat -C \ -b <kafka-api-endpoint> \ -t <stream-name> \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=PLAIN \ -X sasl.username="<sasl.username>" \ -X sasl.password="<sasl.password>"kcat -C \ -b <kafka-api-endpoint> \ -t <stream-name> \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=PLAIN \ -X sasl.username="<sasl.username>" \ -X sasl.password="<sasl.password>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crtКоманда будет непрерывно считывать новые сообщения из потока.
-
В отдельном терминале запустите команду отправки сообщения в поток:
Serverless базаDedicated базаecho "test message" | kcat -P \ -b <kafka-api-endpoint> \ -t <stream-name> \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=PLAIN \ -X sasl.username="<sasl.username>" \ -X sasl.password="<sasl.password>"echo "test message" | kcat -P \ -b <kafka-api-endpoint> \ -t <stream-name> \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanism=PLAIN \ -X sasl.username="<sasl.username>" \ -X sasl.password="<sasl.password>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt
Основную документацию по работе с Data Streams через Kafka API и больше примеров см. в документации YDB