Yandex Cloud
Search
Contact UsGet started
  • Blog
  • Pricing
  • Documentation
  • All Services
  • System Status
    • Featured
    • Infrastructure & Network
    • Data Platform
    • Containers
    • Developer tools
    • Serverless
    • Security
    • Monitoring & Resources
    • ML & AI
    • Business tools
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Customer Stories
    • Gateway to Russia
    • Cloud for Startups
    • Education and Science
  • Blog
  • Pricing
  • Documentation
Yandex project
© 2025 Yandex.Cloud LLC
Yandex MetaData Hub
    • All tutorials
      • Using Schema Registry with Managed Service for Apache Kafka®
      • Delivering data in Debezium CDC format from Apache Kafka®
  • Audit Trails events
  • Pricing policy
  • Troubleshooting
  • Public materials
  • Release notes

In this article:

  • Getting started
  • Create producer and consumer scripts
  • Check that Managed Schema Registry runs correctly
  • Delete the resources you created
  1. Tutorials
  2. Schema Registry
  3. Using Schema Registry with Managed Service for Apache Kafka®

Using Schema Registry with Yandex Managed Service for Apache Kafka®

Written by
Yandex Cloud
Updated at December 10, 2024
  • Getting started
  • Create producer and consumer scripts
  • Check that Managed Schema Registry runs correctly
  • Delete the resources you created

To use Managed Schema Registry with Managed Service for Apache Kafka®:

  1. Create the producer and consumer scripts on the local machine.
  2. Check that Managed Schema Registry runs correctly.
  3. Delete the resources you created.

This tutorial describes how to register a single data schema. For information on how to register multiple data schemas, see the Confluent Schema Registry documentation.

Getting startedGetting started

  1. Create a Managed Service for Apache Kafka® cluster with any suitable configuration. When creating a cluster, enable Schema registry and Public access.

    1. Create a topic named messages for exchanging messages between the producer and the consumer.
    2. Create a user named user and grant them permissions for the messages topic:
      • ACCESS_ROLE_CONSUMER
      • ACCESS_ROLE_PRODUCER
  2. In the network hosting the Managed Service for Apache Kafka® cluster, create a VM with Ubuntu 20.04 and a public IP address.

  3. If you are using security groups, configure them to allow all required traffic between the Managed Service for Apache Kafka® cluster and the VM.

Create producer and consumer scriptsCreate producer and consumer scripts

The above scripts send and receive messages in the messages topic as a key:value pair. In the example, data format schemas are described in Avro format.

Note

Python scripts are provided for demonstration. You can prepare and send data format schemas and the data itself by creating a similar script in another language.

  1. Connect to the VM over SSH.

  2. Install the necessary Python packages:

    sudo apt-get update && \
    sudo pip3 install avro confluent_kafka
    
  3. To use an encrypted connection, install an SSL certificate.

    sudo mkdir -p /usr/share/ca-certificates && \
    sudo wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \
              -O /usr/share/ca-certificates/YandexInternalRootCA.crt && \
    sudo chmod 655 /usr/share/ca-certificates/YandexInternalRootCA.crt
    
  4. Create a Python script for the consumer.

    The script works as follows:

    1. Connect to the messages topic and Confluent Schema Registry.
    2. In a continuous cycle, read messages sent to the messages topic.
    3. When receiving a message, request the necessary schemas in Confluent Schema Registry to parse the message.
    4. Parse binary data from the message according to the schemas for the key and value and display the result on the screen.

    consumer.py

    #!/usr/bin/python3
    
    from confluent_kafka.avro import AvroConsumer
    from confluent_kafka.avro.serializer import SerializerError
    
    
    c = AvroConsumer(
        {
            "bootstrap.servers": ','.join([
            "<broker_host_1_FQDN>:9091",
            ...
            "<broker_host_N_FQDN>:9091",
            ]),
            "group.id": "avro-consumer",
            "security.protocol": "SASL_SSL",
            "ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt",
            "sasl.mechanism": "SCRAM-SHA-512",
            "sasl.username": "user",
            "sasl.password": "<user_password>",
            "schema.registry.url": "https://<Managed_Schema_Registry_server_FQDN_or_IP_address>:443",
            "schema.registry.basic.auth.credentials.source": "SASL_INHERIT",
            "schema.registry.ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt",
            "auto.offset.reset": "earliest"
        }
    )
    
    c.subscribe(["messages"])
    
    while True:
        try:
            msg = c.poll(10)
    
        except SerializerError as e:
            print("Message deserialization failed for {}: {}".format(msg, e))
            break
    
        if msg is None:
            continue
    
        if msg.error():
            print("AvroConsumer error: {}".format(msg.error()))
            continue
    
        print(msg.value())
    
    c.close()
    
  5. Create a Python script for the producer.

    The script works as follows:

    1. Connect to the schema registry and provide to it the data format schemas for the key and value.
    2. Generate the key and value based on the schemas provided.
    3. Send a message consisting of the key:meaning pair to the messages topic. The schema versions are added to the message automatically.

    producer.py

    #!/usr/bin/python3
    
    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer
    
    
    value_schema_str = """
    {
        "namespace": "my.test",
        "name": "value",
        "type": "record",
        "fields": [
            {
                "name": "name",
                "type": "string"
            }
        ]
    }
    """
    
    key_schema_str = """
    {
        "namespace": "my.test",
        "name": "key",
        "type": "record",
        "fields": [
            {
                "name": "name",
                "type": "string"
            }
        ]
    }
    """
    
    value_schema = avro.loads(value_schema_str)
    key_schema = avro.loads(key_schema_str)
    value = {"name": "Value"}
    key = {"name": "Key"}
    
    
    def delivery_report(err, msg):
        """Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush()."""
        if err is not None:
            print("Message delivery failed: {}".format(err))
        else:
            print("Message delivered to {} [{}]".format(msg.topic(), msg.partition()))
    
    
    avroProducer = AvroProducer(
        {
            "bootstrap.servers": ','.join([
                "<broker_host_1_FQDN>:9091",
                ...
                "<broker_host_N_FQDN>:9091",
            ]),
            "security.protocol": 'SASL_SSL',
            "ssl.ca.location": '/usr/share/ca-certificates/YandexInternalRootCA.crt',
            "sasl.mechanism": 'SCRAM-SHA-512',
            "sasl.username": 'user',
            "sasl.password": '<user_password>',
            "on_delivery": delivery_report,
            "schema.registry.basic.auth.credentials.source": 'SASL_INHERIT',
            "schema.registry.url": 'https://<Managed_Schema_Registry_server_FQDN_or_IP_address>:443',
            "schema.registry.ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt"
        },
        default_key_schema=key_schema,
        default_value_schema=value_schema
    )
    
    avroProducer.produce(topic="messages", key=key, value=value)
    avroProducer.flush()
    

Check that Managed Schema Registry runs correctlyCheck that Managed Schema Registry runs correctly

  1. Start the consumer:

    python3 ./consumer.py
    
  2. In a separate terminal, start the producer:

    python3 ./producer.py
    
  3. Make sure that the data sent by the producer is received and correctly interpreted by the consumer:

    {'name': 'Value'}
    

Delete the resources you createdDelete the resources you created

Delete the resources you no longer need to avoid paying for them:

  • Delete the Managed Service for Apache Kafka® cluster.
  • Delete the virtual machine.
  • If you reserved public static IP addresses, release and delete them.

Was the article helpful?

Previous
Transferring metadata between Yandex Data Processing clusters using Metastore
Next
Delivering data in Debezium CDC format from Apache Kafka®
Yandex project
© 2025 Yandex.Cloud LLC