Working with the managed schema registry via the REST API
In Managed Service for Apache Kafka® clusters, you can work with Managed Schema Registry either using Apache Kafka® clients for various programming languages or via the REST API.
Managed Service for Apache Kafka® also provides the REST API for Apache Kafka®. Among other things, this API enables you to send and receive messages without using third-party producers and consumers. This tutorial will also demonstrate these features.
To explore the REST API features for Managed Schema Registry and Apache Kafka®:
- Create data format schemas.
- Send messages to a topic.
- Read messages from the topic.
- Delete the resources you created.
Required paid resources
The support cost for this solution includes:
- Managed Service for Apache Kafka® cluster fee, which covers the use of computing resources allocated to hosts (including ZooKeeper hosts) and disk space (see Apache Kafka® pricing).
- Fee for using public IP addresses (see Virtual Private Cloud pricing).
Getting started
Set up your infrastructure
-
Create a Managed Service for Apache Kafka® cluster of any suitable configuration.
When creating a cluster, enable the following options:
-
Schema registry.
The cluster will deploy a Managed Schema Registry schema registry, making the REST API for Managed Schema Registry available.
-
Kafka Rest API.
The REST API for Apache Kafka® will become available in the cluster.
-
Public access.
Broker hosts will become available from the internet.
-
-
Create a topic named
messagesfor exchanging messages between the producer and the consumer. -
Create a user named
user1and grant them permissions for themessagestopic:ACCESS_ROLE_CONSUMERACCESS_ROLE_PRODUCER
This user will be able to send and receive messages within the topic, as well as perform any operations on Managed Schema Registry subjects that are associated with the topic.
-
Complete all pre-configuration steps to connect to the cluster.
Install utilities
-
Install cURL
:sudo apt install curl -yIt will be used to send API requests.
For convenience, this tutorial will use the --user
cURL option when sending requests to the API. With this option used, cURL will automatically add the Authorization HTTP header with the value required for authorization to the request.Tip
You can build the
Authorizationheader yourself, e.g., if not using cURL. -
Install jq
:sudo apt install jq -yYou can use it to convert schema descriptions to the required format.
When using the REST API for Managed Schema Registry, you need to provide schema descriptions as an escaped character string, e.g.:
"schema": "{\"type\": \"record\", \"name\": \"Obj\", \"fields\":[...]}"For convenience, this tutorial shows schemas as JSON documents with indentations and line breaks. When sending API requests, schemas are formatted as required using
jq.Tip
When sending a REST API request via
cURL, the server response comes as a single JSON string.You can further process the command output from this tutorial using
jqto make the server response more readable.
Create data format schemas
Note
This tutorial uses the Avro
You can use other schema types supported in Managed Schema Registry.
Let's assume an Apache Kafka® message in the messages topic must consist of a key and a value in the following format:
|
Key |
Value |
|
|
Create the relevant data format schemas:
-
Create a file named
schema-key.jsoncontaining the data format schema for the Apache Kafka® message key.schema-key.json
{ "type": "record", "name": "my_key", "fields": [ { "name": "id", "type": "int" }, { "name": "sid", "type": "string" } ] } -
Create a data format schema for the Apache Kafka® message key.
The subject name for the schema must consist of the name of the topic the schema will be used in (
messages) and the-keysuffix.Call the POST /subjects/(subject)/versions
REST API method for Managed Schema Registry, e.g., via the following request:jq \ -n --slurpfile data schema-key.json \ '{ "schemaType": "AVRO", "schema": "\($data)" }' \ | curl \ --request POST \ --url 'https://<broker_host_FQDN>:443/subjects/messages-key/versions' \ --user user1:<user_password> \ --header 'Content-Type: application/vnd.schemaregistry.v1+json' \ --data "@-"The response to the request will return the new schema ID, e.g.,
{"id":1}. -
Create a file named
schema-value.jsoncontaining the data format schema for the Apache Kafka® message value.schema-value.json
{ "type": "record", "name": "my_value", "fields": [ { "name": "name", "type": "string" }, { "name": "city", "type": "string" }, { "name": "age", "type": "int" } ] } -
Create a data format schema for the Apache Kafka® message value.
The subject name for the schema must consist of the name of the topic the schema will be used in (
messages) and the-valuesuffix.Call the POST /subjects/(subject)/versions
REST API method for Managed Schema Registry, e.g., via the following request:jq \ -n --slurpfile data schema-value.json \ '{ "schemaType": "AVRO", "schema": "\($data)" }' \ | curl \ --request POST \ --url 'https://<broker_host_FQDN>:443/subjects/messages-value/versions' \ --user user1:<user_password> \ --header 'Content-Type: application/vnd.schemaregistry.v1+json' \ --data "@-"The response to the request will return the new schema ID, e.g.,
{"id":2}.
Send messages to a topic
-
Get the data format schema IDs for the key and the value.
Call the
GET /schemasREST API method for Managed Schema Registry, e.g., via the following request:curl \ --request GET \ --url 'https://<broker_host_FQDN>:443/schemas' \ --user user1:<user_password> \ --header 'Accept: application/vnd.schemaregistry.v1+json'The response to the request contains data format schema IDs (
id). You will need these IDs later.Response example:
For brevity, the data format schema named
schemain the form of JSON strings is not provided.[ { "id": 1, "schema": "<data_format_schema>", "schemaType": "AVRO", "subject": "messages-key", "version": 1 }, { "id": 2, "schema": "<data_format_schema>", "schemaType": "AVRO", "subject": "messages-value", "version": 1 } ] -
Create a file named
message-list.jsoncontaining two messages. For each message, the key and the value are specified according to the data format schemas created earlier.message-list.json
[ { "key": { "id": 1111, "sid": "AAAAA-BBBBB-CCCCC" }, "value": { "name": "Anna", "city": "Moscow", "age": 44 } }, { "key": { "id": 2222, "sid": "DDDDD-EEEEE-FFFFF" }, "value": { "name": "Alex", "city": "London", "age": 32 } } ] -
Send messages to the
messagestopic.Call the POST /topics/(topic)
REST API method for Apache Kafka®, e.g., via the following request:jq \ -n --slurpfile data message-list.json \ '{ "key_schema_id": <messages-key_schema_ID>, "value_schema_id": <messages-value_schema_ID>, "records": $data.[] }' \ | curl \ --request POST \ --url 'https://<broker_host_FQDN>:443/topics/messages' \ --user user1:<user_password> \ --header 'Content-Type: application/vnd.kafka.avro.v2+json' \ --header 'Accept: application/vnd.kafka.v2+json' \ --data "@-"The schema ID values were obtained earlier by requesting the
GET /schemasendpoint.Response example:
{ "key_schema_id": 1, "offsets": [ { "offset": 0, "partition": 0 }, { "offset": 0, "partition": 1 } ], "value_schema_id": 2 }
Read messages from the topic
-
Create a consumer named
my-consumerin the consumer group namedmy-group.Call the POST /consumers/(group)
REST API method for Apache Kafka®, e.g., via the following request:curl \ --request POST \ --url 'https://<broker_host_FQDN>:443/consumers/my-group' \ --user user1:<user_password> \ --header 'Content-Type: application/vnd.kafka.v2+json' \ --header 'Accept: application/vnd.kafka.v2+json' \ --data '{ "name": "my-consumer", "format": "avro", "auto.offset.reset": "earliest" }'Response example:
{ "base_uri": "https://<broker_host_FQDN>:443/consumers/my-group/instances/my-consumer", "instance_id": "my-consumer" } -
Subscribe to the
messagestopic asmy-consumerfrom themy-groupconsumer group.Call the POST /consumers/(group)/instances/(instance)/subscription
REST API method for Apache Kafka®, e.g., via the following request:curl \ --request POST \ --url 'https://<broker_host_FQDN>:443/consumers/my-group/instances/my-consumer/subscription' \ --user user1:<user_password> \ --header 'Content-Type: application/vnd.kafka.v2+json' \ --header 'Accept: application/vnd.kafka.v2+json' \ --data '{"topics": ["messages"]}'The API server does not return a response body for this request, only an HTTP status code.
-
Read all messages from the
messagestopic asmy-consumerfrom themy-groupconsumer group.Call the GET /consumers/(group)/instances/(instance)/records
REST API method for Apache Kafka®, e.g., via the following request:curl \ --request GET \ --url 'https://<broker_host_FQDN>:443/consumers/my-group/instances/my-consumer/records' \ --user user1:<user_password> \ --header 'Accept: application/vnd.kafka.avro.v2+json'If the response to the request contains messages that were sent earlier, this means the producer and consumer are successfully interpreting the messages in accordance with the specified data format schemas.
Response example:
[ { "key": { "id": 2222, "sid": "DDDDD-EEEEE-FFFFF" }, "offset": 0, "partition": 1, "timestamp": 1726031054186, "topic": "messages", "value": { "age": 32, "city": "London", "name": "Alex" } }, { "key": { "id": 1111, "sid": "AAAAA-BBBBB-CCCCC" }, "offset": 0, "partition": 0, "timestamp": 1726031054186, "topic": "messages", "value": { "age": 44, "city": "Moscow", "name": "Anna" } } ]
Delete the resources you created
Delete the resources you no longer need to avoid paying for them:
- Delete the Managed Service for Apache Kafka® cluster.
- If you reserved public static IP addresses, release and delete them.