Working with the managed schema registry
To use Managed Schema Registry with Managed Service for Apache Kafka®:
- Create producer and consumer scripts on your local machine.
- Check that Managed Schema Registry runs correctly.
- Delete the resources you created.
This tutorial describes how to register a single data schema. For information on how to register multiple data schemas, see this Confluent Schema Registry article
Required paid resources
The infrastructure support cost includes:
- Fee for the Managed Service for Apache Kafka® cluster computing resources and storage (see Managed Service for Apache Kafka® pricing).
- Fee for VM computing resources and disks (see Yandex Compute Cloud pricing).
- Fee for a public IP address (see Yandex Virtual Private Cloud pricing).
Getting started
-
Create a Managed Service for Apache Kafka® cluster of any suitable configuration. When creating a cluster, enable Schema registry and Public access.
- Create a topic named
messagesfor exchanging messages between the producer and the consumer. - Create a user named
userand grant them permissions for themessagestopic:ACCESS_ROLE_CONSUMERACCESS_ROLE_PRODUCER
- Create a topic named
-
In the network hosting the Managed Service for Apache Kafka® cluster, create a VM running Ubuntu 20.04 with a public IP address.
-
If using security groups, configure them to allow all required traffic between your Managed Service for Apache Kafka® cluster and VM.
Create producer and consumer scripts
These scripts send and receive messages in the messages topic as a key:value pair. This example shows the data format schemas in Avro
Note
Python scripts are provided for demonstration only. You can prepare and send data format schemas and the data itself by creating a similar script in another language.
-
Connect to the VM over SSH.
-
Install the required Python packages:
sudo apt-get update && \ sudo pip3 install avro confluent_kafka -
To use an encrypted connection, install an SSL certificate.
sudo mkdir -p /usr/share/ca-certificates && \ sudo wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \ -O /usr/share/ca-certificates/YandexInternalRootCA.crt && \ sudo chmod 655 /usr/share/ca-certificates/YandexInternalRootCA.crt -
Create a Python script for the consumer.
Here is how the script works:
- Connect to the
messagestopic and Confluent Schema Registry. - Continuously read messages arriving in the
messagestopic. - When receiving a message, request the required schemas from Confluent Schema Registry to parse the message.
- Parse the message binary data based on the key and value schemas and display the result.
consumer.py#!/usr/bin/python3 from confluent_kafka.avro import AvroConsumer from confluent_kafka.avro.serializer import SerializerError c = AvroConsumer( { "bootstrap.servers": ','.join([ "<broker_host_1_FQDN>:9091", ... "<broker_host_N_FQDN>:9091", ]), "group.id": "avro-consumer", "security.protocol": "SASL_SSL", "ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt", "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": "user", "sasl.password": "<user_password>", "schema.registry.url": "https://<Managed_Schema_Registry_server_FQDN_or_IP_address>:443", "schema.registry.basic.auth.credentials.source": "SASL_INHERIT", "schema.registry.ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt", "auto.offset.reset": "earliest" } ) c.subscribe(["messages"]) while True: try: msg = c.poll(10) except SerializerError as e: print("Message deserialization failed for {}: {}".format(msg, e)) break if msg is None: continue if msg.error(): print("AvroConsumer error: {}".format(msg.error())) continue print(msg.value()) c.close() - Connect to the
-
Create a Python script for the producer.
Here is how the script works:
- Connect to the schema registry and send the key and value data format schemas.
- Generate the key and value based on the schemas you sent.
- Send a message containing a
key:valuepair to themessagestopic. The system will automatically add the schema versions to your message.
producer.py#!/usr/bin/python3 from confluent_kafka import avro from confluent_kafka.avro import AvroProducer value_schema_str = """ { "namespace": "my.test", "name": "value", "type": "record", "fields": [ { "name": "name", "type": "string" } ] } """ key_schema_str = """ { "namespace": "my.test", "name": "key", "type": "record", "fields": [ { "name": "name", "type": "string" } ] } """ value_schema = avro.loads(value_schema_str) key_schema = avro.loads(key_schema_str) value = {"name": "Value"} key = {"name": "Key"} def delivery_report(err, msg): """Called once for each message produced to indicate delivery result. Triggered by poll() or flush().""" if err is not None: print("Message delivery failed: {}".format(err)) else: print("Message delivered to {} [{}]".format(msg.topic(), msg.partition())) avroProducer = AvroProducer( { "bootstrap.servers": ','.join([ "<broker_host_1_FQDN>:9091", ... "<broker_host_N_FQDN>:9091", ]), "security.protocol": 'SASL_SSL', "ssl.ca.location": '/usr/share/ca-certificates/YandexInternalRootCA.crt', "sasl.mechanism": 'SCRAM-SHA-512', "sasl.username": 'user', "sasl.password": '<user_password>', "on_delivery": delivery_report, "schema.registry.basic.auth.credentials.source": 'SASL_INHERIT', "schema.registry.url": 'https://<Managed_Schema_Registry_server_FQDN_or_IP_address>:443', "schema.registry.ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt" }, default_key_schema=key_schema, default_value_schema=value_schema ) avroProducer.produce(topic="messages", key=key, value=value) avroProducer.flush()
Check that Managed Schema Registry runs correctly
-
Start the consumer:
python3 ./consumer.py -
In a separate terminal, start the producer:
python3 ./producer.py -
Make sure the data sent by the producer is received and correctly interpreted by the consumer:
{'name': 'Value'}
Delete the resources you created
Delete the resources you no longer need to avoid paying for them:
- Delete the Managed Service for Apache Kafka® cluster.
- Delete the VM.
- If you reserved public static IP addresses, release and delete them.