Работа с управляемым реестром схем формата данных с помощью REST API
В кластерах Managed Service for Apache Kafka® можно работать с Managed Schema Registry либо с помощью клиентов Apache Kafka® для различных языков программирования, либо с помощью REST API.
Также Managed Service for Apache Kafka® предоставляет REST API для Apache Kafka®. В том числе, с помощью этого API можно отправлять и получать сообщения без использования сторонних производителей и потребителей. Эти возможности также будут продемонстрированы в этом практическом руководстве.
Чтобы познакомиться с возможностями REST API для Managed Schema Registry и Apache Kafka®:
- Создайте схемы формата данных.
- Отправьте сообщения в топик.
- Получите сообщения из топика.
- Удалите созданные ресурсы.
Перед началом работы
Подготовьте инфраструктуру
-
Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации.
При создании кластера включите опции:
-
Реестр схем данных.
В кластере будет развернут реестр схем формата данных Managed Schema Registry и станет доступен REST API для Managed Schema Registry.
-
Kafka Rest API.
В кластере станет доступен REST API для Apache Kafka®.
-
Публичный доступ.
Хосты-брокеры станут доступны из интернета.
-
-
Создайте топик с именем
messages
для обмена сообщениями между производителем и потребителем. -
Создайте пользователя с именем
user1
и выдайте ему права на топикmessages
:ACCESS_ROLE_CONSUMER
,ACCESS_ROLE_PRODUCER
.
Этот пользователь сможет отправлять и получать сообщения в рамках топика, а также выполнять любые операции над субъектами в Managed Schema Registry, которые связаны с топиком.
-
Выполните все шаги по предварительной настройке для подключения к кластеру.
Установите утилиты
-
Установите утилиту cURL
:sudo apt install curl -y
С ее помощью будут выполняться запросы к API.
Для удобства в этом практическом руководстве при выполнении запросов к API будет использоваться опция cURL --user
. При указанной опции cURL сам добавит в запрос HTTP-заголовок Authorization с необходимым значением для авторизации.Совет
Вы можете самостоятельно сконструировать заголовок
Authorization
, например, если вы не используете cURL. -
Установите утилиту jq
:sudo apt install jq -y
С ее помощью описания схем будут приводиться к нужному формату.
При использовании REST API для Managed Schema Registry описания схем необходимо передавать в виде строки с экранированными символами, например:
"schema": "{\"type\": \"record\", \"name\": \"Obj\", \"fields\":[...]}"
Для удобства в этом практическом руководстве схемы представлены в виде JSON-документов c отступами и переносами строк, а при выполнении запросов к API схемы приводятся к нужному формату с помощью
jq
.Совет
После выполнения запроса к REST API с помощью cURL, ответ сервера представляется в виде одной JSON-строки.
Вы можете дополнительно обработать вывод команд из этого практического руководства с помощью
jq
, чтобы сделать ответ сервера удобочитаемым.
Создайте схемы формата данных
Примечание
В этом практическом руководстве используются схемы типа Avro
Вы можете использовать другие типы схем, которые поддерживаются в Managed Schema Registry.
Пусть сообщение Apache Kafka® в топике messages
должно состоять из ключа и значения в следующем формате:
Ключ |
Значение |
|
|
Создайте соответствующие схемы формата данных:
-
Создайте файл
schema-key.json
, который содержит схему формата данных для ключа сообщения Apache Kafka®.schema-key.json
{ "type": "record", "name": "my_key", "fields": [ { "name": "id", "type": "int" }, { "name": "sid", "type": "string" } ] }
-
Создайте схему формата данных для ключа сообщения Apache Kafka®.
Имя субъекта для схемы должно состоять из имени топика, в котором будет использоваться эта схема (
messages
) и суффикса-key
.Воспользуйтесь методом POST /subjects/(subject)/versions
REST API для Managed Schema Registry и выполните запрос:jq \ -n --slurpfile data schema-key.json \ '{ "schemaType": "AVRO", "schema": "\($data)" }' \ | curl \ --request POST \ --url 'https://<FQDN_хоста-брокера>:443/subjects/messages-key/versions' \ --user user1:<пароль_пользователя> \ --header 'Content-Type: application/vnd.schemaregistry.v1+json' \ --data "@-"
В ответе на запрос будет возвращен идентификатор созданной схемы, например,
{"id":1}
. -
Создайте файл
schema-value.json
, который содержит схему формата данных для значения сообщения Apache Kafka®.schema-value.json
{ "type": "record", "name": "my_value", "fields": [ { "name": "name", "type": "string" }, { "name": "city", "type": "string" }, { "name": "age", "type": "int" } ] }
-
Создайте схему формата данных для значения сообщения Apache Kafka®.
Имя субъекта для схемы должно состоять из имени топика, в котором будет использоваться эта схема (
messages
) и суффикса-value
.Воспользуйтесь методом POST /subjects/(subject)/versions
REST API для Managed Schema Registry и выполните запрос:jq \ -n --slurpfile data schema-value.json \ '{ "schemaType": "AVRO", "schema": "\($data)" }' \ | curl \ --request POST \ --url 'https://<FQDN_хоста-брокера>:443/subjects/messages-value/versions' \ --user user1:<пароль_пользователя> \ --header 'Content-Type: application/vnd.schemaregistry.v1+json' \ --data "@-"
В ответе на запрос будет возвращен идентификатор созданной схемы, например,
{"id":2}
.
Отправьте сообщения в топик
-
Получите идентификаторы схем формата данных для ключа и значения.
Воспользуйтесь методом
GET /schemas
REST API для Managed Schema Registry и выполните запрос:curl \ --request GET \ --url 'https://<FQDN_хоста-брокера>:443/schemas' \ --user user1:<пароль_пользователя> \ --header 'Accept: application/vnd.schemaregistry.v1+json'
В ответе на запрос содержатся идентификаторы схем формата данных (
id
). Эти идентификаторы будут использоваться позднее.Пример ответа на запрос
Для краткости схема формата данных
schema
в виде JSON-строк не приводится.[ { "id": 1, "schema": "<схема_формата_данных>", "schemaType": "AVRO", "subject": "messages-key", "version": 1 }, { "id": 2, "schema": "<схема_формата_данных>", "schemaType": "AVRO", "subject": "messages-value", "version": 1 } ]
-
Создайте файл
message-list.json
, который содержит два сообщения. Для каждого сообщения указаны ключ и значение в соответствии с созданными ранее схемами формата данных.message-list.json
[ { "key": { "id": 1111, "sid": "AAAAA-BBBBB-CCCCC" }, "value": { "name": "Anna", "city": "Moscow", "age": 44 } }, { "key": { "id": 2222, "sid": "DDDDD-EEEEE-FFFFF" }, "value": { "name": "Alex", "city": "London", "age": 32 } } ]
-
Отправьте сообщения в топик
messages
.Воспользуйтесь методом POST /topics/(topic)
REST API для Apache Kafka® и выполните запрос:jq \ -n --slurpfile data message-list.json \ '{ "key_schema_id": <идентификатор_схемы_messages-key>, "value_schema_id": <идентификатор_схемы_messages-value>, "records": $data.[] }' \ | curl \ --request POST \ --url 'https://<FQDN_хоста-брокера>:443/topics/messages' \ --user user1:<пароль_пользователя> \ --header 'Content-Type: application/vnd.kafka.avro.v2+json' \ --header 'Accept: application/vnd.kafka.v2+json' \ --data "@-"
Значения идентификаторов схем были получены ранее с помощью запроса к эндпоинту
GET /schemas
.Пример ответа на запрос
{ "key_schema_id": 1, "offsets": [ { "offset": 0, "partition": 0 }, { "offset": 0, "partition": 1 } ], "value_schema_id": 2 }
Получите сообщения из топика
-
Создайте потребителя
my-consumer
в группе потребителейmy-group
.Воспользуйтесь методом POST /consumers/(group)
REST API для Apache Kafka® и выполните запрос:curl \ --request POST \ --url 'https://<FQDN_хоста-брокера>:443/consumers/my-group' \ --user user1:<пароль_пользователя> \ --header 'Content-Type: application/vnd.kafka.v2+json' \ --header 'Accept: application/vnd.kafka.v2+json' \ --data '{ "name": "my-consumer", "format": "avro", "auto.offset.reset": "earliest" }'
Пример ответа на запрос
{ "base_uri": "https://<FQDN_хоста-брокера>:443/consumers/my-group/instances/my-consumer", "instance_id": "my-consumer" }
-
Подпишитесь на топик
messages
для потребителяmy-consumer
из группы потребителейmy-group
.Воспользуйтесь методом POST /consumers/(group)/instances/(instance)/subscription
REST API для Apache Kafka® и выполните запрос:curl \ --request POST \ --url 'https://<FQDN_хоста-брокера>:443/consumers/my-group/instances/my-consumer/subscription' \ --user user1:<пароль_пользователя> \ --header 'Content-Type: application/vnd.kafka.v2+json' \ --header 'Accept: application/vnd.kafka.v2+json' \ --data '{"topics": ["messages"]}'
Сервер API не возвращает ответа на этот запрос, только HTTP-статус.
-
Получите все сообщения из топика
messages
для потребителяmy-consumer
из группы потребителейmy-group
.Воспользуйтесь методом GET /consumers/(group)/instances/(instance)/records
REST API для Apache Kafka® и выполните запрос:curl \ --request GET \ --url 'https://<FQDN_хоста-брокера>:443/consumers/my-group/instances/my-consumer/records' \ --user user1:<пароль_пользователя> \ --header 'Accept: application/vnd.kafka.avro.v2+json'
Если в ответе на запрос содержатся отправленные ранее сообщения, то это значит, что производитель и потребитель успешно интерпретируют сообщения в соответствии с заданными схемами формата данных.
Пример ответа на запрос
[ { "key": { "id": 2222, "sid": "DDDDD-EEEEE-FFFFF" }, "offset": 0, "partition": 1, "timestamp": 1726031054186, "topic": "messages", "value": { "age": 32, "city": "London", "name": "Alex" } }, { "key": { "id": 1111, "sid": "AAAAA-BBBBB-CCCCC" }, "offset": 0, "partition": 0, "timestamp": 1726031054186, "topic": "messages", "value": { "age": 44, "city": "Moscow", "name": "Anna" } } ]
Удалите созданные ресурсы
Удалите ресурсы, которые вы больше не будете использовать, чтобы за них не списывалась плата:
- Удалите кластер Managed Service for Apache Kafka®.
- Если вы зарезервировали публичные статические IP-адреса, освободите и удалите их.