Using Managed Schema Registry with Yandex Managed Service for Apache Kafka®
To use Managed Schema Registry with Managed Service for Apache Kafka®:
- Create the producer and consumer scripts on the local machine.
- Check that Managed Schema Registry runs correctly.
- Delete the resources you created.
Getting started
-
Create a Managed Service for Apache Kafka® cluster with any suitable configuration. When creating a cluster, enable Schema registry and Public access.
- Create a topic named
messages
for exchanging messages between the producer and the consumer. - Create a user named
user
and grant them permission for themessages
topic:ACCESS_ROLE_CONSUMER
ACCESS_ROLE_PRODUCER
- Create a topic named
-
In the network hosting the Managed Service for Apache Kafka® cluster, create a VM with Ubuntu 20.04 and a public IP address.
-
If you are using security groups, configure them to allow all required traffic between the Managed Service for Apache Kafka® cluster and the VM.
Create producer and consumer scripts
The above scripts send and receive messages
in the messages topic as a key:value
pair. In the example, data format schemas are described in Avro
Note
Python scripts are provided for demonstration. You can prepare and send data format schemas and the data itself by creating a similar script in another language.
-
Connect to the VM over SSH.
-
Install the necessary Python packages:
sudo apt-get update && \ sudo pip3 install avro confluent_kafka
-
To use an encrypted connection, install an SSL certificate.
sudo mkdir -p /usr/share/ca-certificates && \ sudo wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \ -O /usr/share/ca-certificates/YandexInternalRootCA.crt && \ sudo chmod 655 /usr/share/ca-certificates/YandexInternalRootCA.crt
-
Create a Python script for the consumer.
The script works as follows:
- Connect to the
messages
topic and Confluent Schema Registry. - In a continuous cycle, read messages sent to the
messages
topic. - When receiving a message, request the necessary schemas in Confluent Schema Registry to parse the message.
- Parse binary data from the message according to the schemas for the key and value and display the result on the screen.
consumer.py
#!/usr/bin/python3 from confluent_kafka.avro import AvroConsumer from confluent_kafka.avro.serializer import SerializerError c = AvroConsumer( { "bootstrap.servers": ','.join([ "<FQDN_of_broker_host_1>:9091", ... "<FQDN_of_broker_host_N>:9091", ]), "group.id": "avro-consumer", "security.protocol": "SASL_SSL", "ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt", "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": "user", "sasl.password": "<password_of_the_user_named_user>", "schema.registry.url": "https://<FQDN_or_IP_of_Managed_Schema_Registry_server>:443", "schema.registry.basic.auth.credentials.source": "SASL_INHERIT", "schema.registry.ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt", "auto.offset.reset": "earliest" } ) c.subscribe(["messages"]) while True: try: msg = c.poll(10) except SerializerError as e: print("Message deserialization failed for {}: {}".format(msg, e)) break if msg is None: continue if msg.error(): print("AvroConsumer error: {}".format(msg.error())) continue print(msg.value()) c.close()
- Connect to the
-
Create a Python script for the producer.
The script works as follows:
- Connect to the schema registry and provide to it the data format schemas for the key and value.
- Generate the key and value based on the schemas provided.
- Send a
message
consisting of thekey:meaning
pair to the messages topic. The schema versions are added to the message automatically.
producer.py
#!/usr/bin/python3 from confluent_kafka import avro from confluent_kafka.avro import AvroProducer value_schema_str = """ { "namespace": "my.test", "name": "value", "type": "record", "fields": [ { "name": "name", "type": "string" } ] } """ key_schema_str = """ { "namespace": "my.test", "name": "key", "type": "record", "fields": [ { "name": "name", "type": "string" } ] } """ value_schema = avro.loads(value_schema_str) key_schema = avro.loads(key_schema_str) value = {"name": "Value"} key = {"name": "Key"} def delivery_report(err, msg): """Called once for each message produced to indicate delivery result. Triggered by poll() or flush().""" if err is not None: print("Message delivery failed: {}".format(err)) else: print("Message delivered to {} [{}]".format(msg.topic(), msg.partition())) avroProducer = AvroProducer( { "bootstrap.servers": ','.join([ "<FQDN_of_broker_host_1>:9091", ... "<FQDN_of_broker_host_N>:9091", ]), "security.protocol": 'SASL_SSL', "ssl.ca.location": '/usr/share/ca-certificates/YandexInternalRootCA.crt', "sasl.mechanism": 'SCRAM-SHA-512', "sasl.username": 'user', "sasl.password": '<password_of_the_user_named_user>', "on_delivery": delivery_report, "schema.registry.basic.auth.credentials.source": 'SASL_INHERIT', "schema.registry.url": 'https://<FQDN_or_IP_of_Managed_Schema_Registry_server>:443', "schema.registry.ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt" }, default_key_schema=key_schema, default_value_schema=value_schema ) avroProducer.produce(topic="messages", key=key, value=value) avroProducer.flush()
Check that Managed Schema Registry runs correctly
-
Start the consumer:
python3 ./consumer.py
-
In a separate terminal, start the producer:
python3 ./producer.py
-
Make sure that the data sent by the producer is received and correctly interpreted by the consumer:
{'name': 'Value'}
Delete the resources you created
Delete the resources you no longer need to avoid paying for them:
- Delete the Managed Service for Apache Kafka® cluster.
- Delete the virtual machine.
- If you reserved public static IP addresses, release and delete them.