Yandex Cloud
Search
Contact UsGet started
  • Blog
  • Pricing
  • Documentation
  • All Services
  • System Status
    • Featured
    • Infrastructure & Network
    • Data Platform
    • Containers
    • Developer tools
    • Serverless
    • Security
    • Monitoring & Resources
    • ML & AI
    • Business tools
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Customer Stories
    • Gateway to Russia
    • Cloud for Startups
    • Education and Science
  • Blog
  • Pricing
  • Documentation
Yandex project
© 2025 Yandex.Cloud LLC
Yandex Managed Service for Apache Kafka®
  • Getting started
    • All tutorials
    • Deploying the Apache Kafka® web interface
      • Managing data schemas in Managed Service for Apache Kafka®
      • Working with the managed schema registry
      • Working with the managed schema registry via the REST API
      • Using Confluent Schema Registry with Managed Service for Apache Kafka®
    • Working with Apache Kafka® topics using Yandex Data Processing
  • Access management
  • Pricing policy
  • Terraform reference
  • Yandex Monitoring metrics
  • Audit Trails events
  • Public materials
  • Release notes
  • FAQ

In this article:

  • Required paid resources
  • Getting started
  • Create a topic for notifications about changes in data format schemas
  • Install and configure Confluent Schema Registry on a VM
  • Create producer and consumer scripts
  • Make sure that Confluent Schema Registry is working correctly
  • Delete the resources you created
  1. Tutorials
  2. Using data format schemas with Managed Service for Apache Kafka®
  3. Using Confluent Schema Registry with Managed Service for Apache Kafka®

Using Confluent Schema Registry with Managed Service for Apache Kafka®

Written by
Yandex Cloud
Updated at April 25, 2025
  • Required paid resources
  • Getting started
  • Create a topic for notifications about changes in data format schemas
  • Install and configure Confluent Schema Registry on a VM
  • Create producer and consumer scripts
  • Make sure that Confluent Schema Registry is working correctly
  • Delete the resources you created

In Managed Service for Apache Kafka®, you can use a built-in Managed Schema Registry data format schema registry. To learn more, see Working with the managed schema registry. If you need Confluent Schema Registry, use the information from this guide.

Note

The guide has been tested on Confluent Schema Registry 6.2 and a VM running Ubuntu 20.04 LTS. We cannot guarantee the performance if newer versions are used.

To use Confluent Schema Registry together with Managed Service for Apache Kafka®:

  1. Create a topic for notifications about changes in data format schemas.
  2. Install and configure Confluent Schema Registry on a VM.
  3. Create producer and consumer scripts.
  4. Make sure that Confluent Schema Registry is working correctly.

If you no longer need the resources you created, delete them.

Required paid resourcesRequired paid resources

The support cost includes:

  • Managed Service for Apache Kafka® cluster fee: Using computing resources allocated to hosts (including ZooKeeper hosts) and disk space (see Apache Kafka® pricing).
  • Fee for using public IP addresses if public access is enabled for cluster hosts (see Virtual Private Cloud pricing).
  • VM fee: using computing resources, storage, and public IP address (see Compute Cloud pricing).

Getting startedGetting started

  1. Create a Managed Service for Apache Kafka® cluster with any suitable configuration.

    1. Create a topic named messages for exchanging messages between the producer and the consumer.
    2. Create a user named user and grant them permissions for the messages topic:
      • ACCESS_ROLE_CONSUMER
      • ACCESS_ROLE_PRODUCER
  2. In the network hosting the Managed Service for Apache Kafka® cluster, create a VM with Ubuntu 20.04 LTS from Cloud Marketplace and with a public IP address.

  3. If you are using security groups, configure them to allow all required traffic between the Managed Service for Apache Kafka® cluster and the VM.

  4. In the VM security group, create a rule for incoming traffic that allows connections via port 8081 which is used by the producer and consumer to access the schema registry:

    • Port range: 8081.
    • Protocol: TCP.
    • Destination name: CIDR.
    • CIDR blocks: 0.0.0.0/0 or address ranges of the subnets where the producer and consumer run.

Create a topic for notifications about changes in data format schemasCreate a topic for notifications about changes in data format schemas

  1. Create a service topic named _schemas with the following settings:

    • Number of partitions: 1
    • Cleanup policy: Compact

    Warning

    The specified settings of the Number of partitions and Cleanup policy values are necessary for Confluent Schema Registry to work.

  2. Create a user named registry and grant them permissions for the _schemas topic:

    • ACCESS_ROLE_CONSUMER
    • ACCESS_ROLE_PRODUCER

    Confluent Schema Registry will interact with _schemas as this user.

Install and configure Confluent Schema Registry on a VMInstall and configure Confluent Schema Registry on a VM

  1. Connect to the virtual machine over SSH.

  2. Add the Confluent Schema Registry repository:

    wget -qO - https://packages.confluent.io/deb/6.2/archive.key | sudo apt-key add - && \
    sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/6.2 stable main"
    
  3. Install the packages:

    sudo apt-get update && \
    sudo apt-get install \
         confluent-schema-registry \
         openjdk-11-jre-headless \
         python3-pip --yes
    
  4. Get an SSL certificate.

  5. Create secure storage for the certificate:

    sudo keytool \
         -keystore /etc/schema-registry/client.truststore.jks \
         -alias CARoot \
         -import -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
         -storepass <secure_certificate_storage_password> \
         --noprompt
    
  6. Create a file named /etc/schema-registry/jaas.conf with settings for connecting to the cluster:

    KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="registry"
      password="<registry_user_password>";
    };
    
  7. Edit the /etc/schema-registry/schema-registry.properties file with Confluent Schema Registry settings:

    1. Comment out the line:

      kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
      
    2. Uncomment the line with the listeners parameter. It configures the network address and port that Confluent Schema Registry listens to. The default port for all network interfaces is 8081:

      listeners=http://0.0.0.0:8081
      
    3. Add the following lines at the end of the file:

      kafkastore.bootstrap.servers=SASL_SSL://<broker_host_1_FQDN:9091>,<broker_host_2_FQDN:9091>,...,<broker_host_N_FQDN:9091>
      kafkastore.ssl.truststore.location=/etc/schema-registry/client.truststore.jks
      kafkastore.ssl.truststore.password=<secure_certificate_storage_password>
      kafkastore.sasl.mechanism=SCRAM-SHA-512
      kafkastore.security.protocol=SASL_SSL
      

      You can get a list of broker hosts with a list of cluster hosts.

  8. Edit the /lib/systemd/system/confluent-schema-registry.service file which describes the systemd module.

    1. Go to the [Service] section.

    2. Add the Environment parameter with Java settings:

      ...
      
      [Service]
      Type=simple
      User=cp-schema-registry
      Group=confluent
      Environment="LOG_DIR=/var/log/confluent/schema-registry"
      Environment="_JAVA_OPTIONS='-Djava.security.auth.login.config=/etc/schema-registry/jaas.conf'"
      ...
      
  9. Update the details about the systemd modules:

    sudo systemctl daemon-reload
    
  10. Start the Confluent Schema Registry service:

    sudo systemctl start confluent-schema-registry.service
    
  11. Enable automatic start of Confluent Schema Registry after OS restart:

    sudo systemctl enable confluent-schema-registry.service
    

Create producer and consumer scriptsCreate producer and consumer scripts

The above scripts send and receive messages in the messages topic as a key:value pair. In the example, data format schemas are described in Avro format.

Note

Python scripts are provided for demonstration. You can prepare and send data format schemas and the data itself by creating a similar script in another language.

  1. Install the necessary Python packages:

    sudo pip3 install avro confluent_kafka
    
  2. Create a Python script for the consumer.

    The script works as follows:

    1. Connect to the messages topic and Confluent Schema Registry.
    2. In a continuous cycle, read messages sent to the messages topic.
    3. When receiving a message, request the necessary schemas in Confluent Schema Registry to parse the message.
    4. Parse binary data from the message according to the schemas for the key and value and display the result on the screen.

    consumer.py

    #!/usr/bin/python3
    
    from confluent_kafka.avro import AvroConsumer
    from confluent_kafka.avro.serializer import SerializerError
    
    
    c = AvroConsumer(
        {
            "bootstrap.servers": ','.join([
                "<broker_host_1_FQDN>:9091",
                ...
                "<broker_host_N_FQDN>:9091",
            ]),
            "group.id": "avro-consumer",
            "security.protocol": "SASL_SSL",
            "ssl.ca.location": "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt",
            "sasl.mechanism": "SCRAM-SHA-512",
            "sasl.username": "user",
            "sasl.password": "<user_password>",
            "schema.registry.url": "http://<Confluent_Schema_Registry_server_FQDN_or_IP_address>:8081",
        }
    )
    
    c.subscribe(["messages"])
    
    while True:
        try:
            msg = c.poll(10)
    
        except SerializerError as e:
            print("Message deserialization failed for {}: {}".format(msg, e))
            break
    
        if msg is None:
            continue
    
        if msg.error():
            print("AvroConsumer error: {}".format(msg.error()))
            continue
    
        print(msg.value())
    
    c.close()
    
  3. Create a Python script for the producer.

    The script works as follows:

    1. Connect to the schema registry and provide to it the data format schemas for the key and value.
    2. Generate the key and value based on the schemas provided.
    3. Send a message consisting of the key:meaning pair to the messages topic. The schema versions are added to the message automatically.

    producer.py

    #!/usr/bin/python3
    
    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer
    
    
    value_schema_str = """
    {
        "namespace": "my.test",
        "name": "value",
        "type": "record",
        "fields": [
            {
                "name": "name",
                "type": "string"
            }
        ]
    }
    """
    
    key_schema_str = """
    {
        "namespace": "my.test",
        "name": "key",
        "type": "record",
        "fields": [
            {
                "name": "name",
                "type": "string"
            }
        ]
    }
    """
    
    value_schema = avro.loads(value_schema_str)
    key_schema = avro.loads(key_schema_str)
    value = {"name": "Value"}
    key = {"name": "Key"}
    
    
    def delivery_report(err, msg):
        """Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush()."""
        if err is not None:
            print("Message delivery failed: {}".format(err))
        else:
            print("Message delivered to {} [{}]".format(msg.topic(), msg.partition()))
    
    
    avroProducer = AvroProducer(
        {
            "bootstrap.servers": ','.join([
                "<broker_host_1_FQDN>:9091",
                ...
                "<broker_host_N_FQDN>:9091",
            ]),
            "security.protocol": "SASL_SSL",
            "ssl.ca.location": "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt",
            "sasl.mechanism": "SCRAM-SHA-512",
            "sasl.username": "user",
            "sasl.password": "<user_password>",
            "on_delivery": delivery_report,
            "schema.registry.url": "http://<Schema_Registry_server_FQDN_or_IP_address>:8081",
        },
        default_key_schema=key_schema,
        default_value_schema=value_schema,
    )
    
    avroProducer.produce(topic="messages", key=key, value=value)
    avroProducer.flush()
    

Make sure that Confluent Schema Registry is working correctlyMake sure that Confluent Schema Registry is working correctly

  1. Start the consumer:

    python3 ./consumer.py
    
  2. In a separate terminal, start the producer:

    python3 ./producer.py
    
  3. Make sure that the data sent by the producer is received and correctly interpreted by the consumer:

    {'name': 'Value'}
    

Delete the resources you createdDelete the resources you created

Delete the resources you no longer need to avoid paying for them:

  • Delete the Managed Service for Apache Kafka® cluster.
  • Delete the virtual machine.
  • If you reserved public static IP addresses, release and delete them.

Was the article helpful?

Previous
Working with the managed schema registry via the REST API
Next
Delivering data from PostgreSQL to Apache Kafka®
Yandex project
© 2025 Yandex.Cloud LLC