Yandex Cloud
Search
Contact UsTry it for free
  • Customer Stories
  • Documentation
  • Blog
  • All Services
  • System Status
  • Marketplace
    • Featured
    • Infrastructure & Network
    • Data Platform
    • AI for business
    • Security
    • DevOps tools
    • Serverless
    • Monitoring & Resources
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Start testing with double trial credits
    • Cloud credits to scale your IT product
    • Gateway to Russia
    • Cloud for Startups
    • Center for Technologies and Society
    • Yandex Cloud Partner program
    • Price calculator
    • Pricing plans
  • Customer Stories
  • Documentation
  • Blog
© 2026 Direct Cursus Technology L.L.C.
Tutorials
    • All tutorials
    • Unassisted deployment of the Apache Kafka® web interface
    • Upgrading a Managed Service for Apache Kafka® cluster to migrate from ZooKeeper to KRaft
    • Migrating a database from a third-party Apache Kafka® cluster to Managed Service for Apache Kafka®
    • Moving data between Managed Service for Apache Kafka® clusters using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for Apache Kafka® using Debezium
    • Delivering data from Managed Service for PostgreSQL to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for PostgreSQL to Managed Service for Apache Kafka® using Debezium
    • Delivering data from Managed Service for YDB to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for ClickHouse® using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Yandex MPP Analytics for PostgreSQL using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Yandex StoreDoc using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for MySQL® using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for OpenSearch using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for PostgreSQL using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Managed Service for YDB using Data Transfer
    • Delivering data from Managed Service for Apache Kafka® to Data Streams using Data Transfer
    • Delivering data from Data Streams to Managed Service for YDB using Data Transfer
    • Delivering data from Data Streams to Managed Service for Apache Kafka® using Data Transfer
    • YDB change data capture and delivery to YDS
    • Configuring Kafka Connect to work with a Managed Service for Apache Kafka® cluster
    • Synchronizing Apache Kafka® topics in Object Storage with no web access
      • Managing data schemas in Managed Service for Apache Kafka®
      • Using Managed Schema Registry with Managed Service for Apache Kafka®
      • Using Managed Schema Registry with Managed Service for Apache Kafka® via REST API
      • Using Confluent Schema Registry with Managed Service for Apache Kafka®
    • Monitoring message loss in an Apache Kafka® topic
    • Automating Query tasks with Managed Service for Apache Airflow™
    • Sending requests to the Yandex Cloud API via the Yandex Cloud Python SDK
    • Configuring an SMTP server to send e-mail notifications
    • Adding data to a ClickHouse® DB
    • Migrating data to Managed Service for ClickHouse® using ClickHouse®
    • Migrating data to Managed Service for ClickHouse® using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for ClickHouse® using Data Transfer
    • Asynchronously replicating data from PostgreSQL to ClickHouse®
    • Exchanging data between Managed Service for ClickHouse® and Yandex Data Processing
    • Configuring Managed Service for ClickHouse® for Graphite
    • Fetching data from Managed Service for Apache Kafka® to Managed Service for ClickHouse®
    • Fetching data from Managed Service for Apache Kafka® to ksqlDB
    • Fetching data from RabbitMQ to Managed Service for ClickHouse®
    • Saving a data stream from Data Streams to Managed Service for ClickHouse®
    • Asynchronous replication of data from Yandex Metrica to ClickHouse® using Data Transfer
    • Using hybrid storage in Managed Service for ClickHouse®
    • Sharding Managed Service for ClickHouse® tables
    • Loading data from Yandex Direct to a Managed Service for ClickHouse® data mart using Cloud Functions, Object Storage, and Data Transfer
    • Loading data from Object Storage to Managed Service for ClickHouse® using Data Transfer
    • Migrating data from Managed Service for OpenSearch to Managed Service for ClickHouse® with a storage change using Data Transfer
    • Loading data from Managed Service for YDB to Managed Service for ClickHouse® using Data Transfer
    • Yandex Managed Service for ClickHouse® integration with Microsoft SQL Server via ClickHouse® JDBC Bridge
    • Migrating databases from Google BigQuery to Managed Service for ClickHouse®
    • Yandex Managed Service for ClickHouse® integration with Oracle via ClickHouse® JDBC Bridge
    • Configuring Cloud DNS to access a Managed Service for ClickHouse® cluster from other cloud networks
    • Migrating a Yandex Data Processing HDFS cluster to a different availability zone
    • Importing data from Managed Service for MySQL® to Yandex Data Processing using Sqoop
    • Importing data from Managed Service for PostgreSQL to Yandex Data Processing using Sqoop
    • Mounting Object Storage buckets to the file system of Yandex Data Processing hosts
    • Working with Apache Kafka® topics using Yandex Data Processing
    • Automating operations with Yandex Data Processing using Managed Service for Apache Airflow™
    • Shared use of Yandex Data Processing tables through Apache Hive™ Metastore
    • Transferring metadata across Yandex Data Processing clusters using Apache Hive™ Metastore
    • Importing data from Object Storage, processing, and exporting it to Managed Service for ClickHouse®
    • Migrating collections from a third-party MongoDB cluster to Yandex StoreDoc
    • Migrating data to Yandex StoreDoc
    • Migrating Yandex StoreDoc cluster from 4.4 to 6.0
    • Sharding Yandex StoreDoc collections
    • Yandex StoreDoc performance analysis and tuning
    • Managed Service for MySQL® performance analysis and tuning
    • Syncing data from a third-party MySQL® cluster to Managed Service for MySQL® using Data Transfer
    • Migrating a database from Managed Service for MySQL® to a third-party MySQL® cluster
    • Migrating a database from Managed Service for MySQL® to Object Storage using Data Transfer
    • Migrating data from Object Storage to Managed Service for MySQL® via Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for MySQL® to Managed Service for Apache Kafka® using Debezium
    • Migrating a database from Managed Service for MySQL® to Managed Service for YDB using Data Transfer
    • MySQL® change data capture and delivery to YDS
    • Migrating data from Managed Service for MySQL® to Managed Service for PostgreSQL using Data Transfer
    • Migrating data from AWS RDS for PostgreSQL to Managed Service for PostgreSQL using Data Transfer
    • Migrating data from Managed Service for MySQL® to Yandex MPP Analytics for PostgreSQL using Data Transfer
    • Configuring an index policy in Managed Service for OpenSearch
    • Migrating data from a third-party OpenSearch cluster to Managed Service for OpenSearch using Data Transfer
    • Loading data from Managed Service for OpenSearch to Object Storage using Data Transfer
    • Migrating data from Managed Service for OpenSearch to Managed Service for YDB using Data Transfer
    • Copying data from Managed Service for OpenSearch to Yandex MPP Analytics for PostgreSQL using Yandex Data Transfer
    • Migrating data from Managed Service for PostgreSQL to Managed Service for OpenSearch using Data Transfer
    • Authenticating a Managed Service for OpenSearch cluster in OpenSearch Dashboards using Keycloak
    • Using the yandex-lemmer plugin in Managed Service for OpenSearch
    • Creating a PostgreSQL cluster for 1C:Enterprise
    • Searching for the Managed Service for PostgreSQL cluster performance issues
    • Managed Service for PostgreSQL performance analysis and tuning
    • Logical replication in PostgreSQL
    • Migrating a database from a third-party PostgreSQL cluster to Managed Service for PostgreSQL
    • Migrating a database from Managed Service for PostgreSQL
    • Delivering data from Managed Service for PostgreSQL to Managed Service for Apache Kafka® using Data Transfer
    • Delivering data from Managed Service for PostgreSQL to Managed Service for Apache Kafka® using Debezium
    • Delivering data from Managed Service for PostgreSQL to Managed Service for YDB using Data Transfer
    • Migrating a database from Managed Service for PostgreSQL to Object Storage
    • Migrating data from Object Storage to Managed Service for PostgreSQL via Data Transfer
    • PostgreSQL change data capture and delivery to YDS
    • Migrating data from Managed Service for PostgreSQL to Managed Service for MySQL® using Data Transfer
    • Migrating data from Managed Service for PostgreSQL to Managed Service for OpenSearch using Data Transfer
    • Fixing string sorting issues in PostgreSQL after a glibc upgrade
    • Migrating a database from Greenplum® to ClickHouse®
    • Migrating a database from Greenplum® to PostgreSQL
    • Exporting Greenplum® data to a cold storage in Object Storage
    • Loading data from Object Storage to Yandex MPP Analytics for PostgreSQL using Data Transfer
    • Copying data from Managed Service for OpenSearch to Yandex MPP Analytics for PostgreSQL using Yandex Data Transfer
    • Creating an external table from an Object Storage bucket table using a configuration file
    • Getting data from external sources using named queries in Greenplum®
    • Migrating a database from a third-party Valkey™ cluster to Yandex Managed Service for Valkey™
    • Using a Yandex Managed Service for Valkey™ cluster as a PHP session storage
    • Loading data from Object Storage to Managed Service for YDB using Data Transfer
    • Loading data from Managed Service for YDB to Object Storage using Data Transfer
    • Processing Audit Trails events
    • Processing Cloud Logging logs
    • Processing Debezium CDC streams
    • Analyzing data with Jupyter
    • Processing files with usage details in Yandex Cloud Billing
    • Ingesting data into storage systems
    • Smart log processing
    • Data transfer in microservice architectures
    • Migrating data to Object Storage using Data Transfer
    • Migrating data from a third-party Greenplum® or PostgreSQL cluster to Yandex MPP Analytics for PostgreSQL using Data Transfer
    • Migrating Yandex StoreDoc clusters
    • Migrating MySQL® clusters
    • Migrating to a third-party MySQL® cluster
    • Migrating PostgreSQL clusters
    • Creating a schema registry to deliver data in Debezium CDC format from Apache Kafka®
    • Automating operations using Yandex Managed Service for Apache Airflow™
    • Working with an Object Storage table from a PySpark job
    • Integrating Yandex Managed Service for Apache Spark™ with Apache Hive™ Metastore
    • Running a PySpark job using Yandex Managed Service for Apache Airflow™
    • Using Yandex Object Storage in Yandex Managed Service for Apache Spark™

In this article:

  • Required paid resources
  • Getting started
  • Create a topic for notifications about data format schema changes
  • Install and configure Confluent Schema Registry on your VM
  • Create producer and consumer scripts
  • Make sure Confluent Schema Registry works correctly
  • Delete the resources you created
  1. Building a data platform
  2. Using data format schemas with Managed Service for Apache Kafka®
  3. Using Confluent Schema Registry with Managed Service for Apache Kafka®

Using Confluent Schema Registry with Yandex Managed Service for Apache Kafka®

Written by
Yandex Cloud
Updated at February 6, 2026
  • Required paid resources
  • Getting started
  • Create a topic for notifications about data format schema changes
  • Install and configure Confluent Schema Registry on your VM
  • Create producer and consumer scripts
  • Make sure Confluent Schema Registry works correctly
  • Delete the resources you created

In Managed Service for Apache Kafka®, you can use the integrated Managed Schema Registry. For more information, see Working with the managed schema registry. To use Confluent Schema Registry, follow this tutorial.

Note

We tested this tutorial with Confluent Schema Registry 6.2 and a VM running Ubuntu 20.04 LTS. We do not guarantee support for newer versions.

To use Confluent Schema Registry with Managed Service for Apache Kafka®:

  1. Create a topic for notifications about data format schema changes.
  2. Install and configure Confluent Schema Registry on your VM.
  3. Create producer and consumer scripts.
  4. Make sure Confluent Schema Registry works correctly.

If you no longer need the resources you created, delete them.

Required paid resourcesRequired paid resources

The support cost for this solution includes:

  • Managed Service for Apache Kafka® cluster fee, which covers the use of computing resources allocated to hosts (including ZooKeeper hosts) and disk space (see Apache Kafka® pricing).
  • Fee for public IP addresses if public access is enabled for cluster hosts (see Virtual Private Cloud pricing).
  • VM fee, which covers the use of computing resources, storage, and public IP address (see Compute Cloud pricing).

Getting startedGetting started

  1. Create a Managed Service for Apache Kafka® cluster of any suitable configuration.

    1. Create a topic named messages for exchanging messages between the producer and the consumer.
    2. Create a user named user and grant them permissions for the messages topic:
      • ACCESS_ROLE_CONSUMER
      • ACCESS_ROLE_PRODUCER
  2. In the network hosting the Managed Service for Apache Kafka® cluster, create a VM running Ubuntu 20.04 LTS from Cloud Marketplace with a public IP address.

  3. If using security groups, configure them to allow all required traffic between your Managed Service for Apache Kafka® cluster and VM.

  4. In the VM security group, create an inbound rule that allows connections via port 8081 which is used by the producer and consumer to access the schema registry:

    • Port range: 8081.
    • Protocol: TCP.
    • Destination name: CIDR.
    • CIDR blocks: 0.0.0.0/0 or address ranges of the subnets used by the producer and the consumer.

Create a topic for notifications about data format schema changesCreate a topic for notifications about data format schema changes

  1. Create a service topic named _schemas with the following settings:

    • Number of partitions: 1.
    • Cleanup policy: Compact.

    Warning

    These values for Number of partitions and Cleanup policy are required for Confluent Schema Registry to run correctly.

  2. Create a user named registry and grant them permissions for the _schemas topic:

    • ACCESS_ROLE_CONSUMER
    • ACCESS_ROLE_PRODUCER

    Confluent Schema Registry will use this account to work with _schemas.

Install and configure Confluent Schema Registry on your VMInstall and configure Confluent Schema Registry on your VM

  1. Connect to the VM over SSH.

  2. Add the Confluent Schema Registry repository:

    wget -qO - https://packages.confluent.io/deb/6.2/archive.key | sudo apt-key add - && \
    sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/6.2 stable main"
    
  3. Install the packages:

    sudo apt-get update && \
    sudo apt-get install \
         confluent-schema-registry \
         openjdk-11-jre-headless \
         python3-pip --yes
    
  4. Get an SSL certificate.

  5. Create a secure store for the certificate:

    sudo keytool \
         -keystore /etc/schema-registry/client.truststore.jks \
         -alias CARoot \
         -import -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
         -storepass <secure_certificate_storage_password> \
         --noprompt
    
  6. Create a file named /etc/schema-registry/jaas.conf with settings for connecting to the cluster:

    KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="registry"
      password="<registry_user_password>";
    };
    
  7. Edit the /etc/schema-registry/schema-registry.properties file containing Confluent Schema Registry settings:

    1. Comment out the line as follows:

      kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
      
    2. Uncomment the line with the listeners parameter. It configures the network address and port that Confluent Schema Registry listens to. The default port for all network interfaces is 8081:

      listeners=http://0.0.0.0:8081
      
    3. Add the following lines at the end of the file:

      kafkastore.bootstrap.servers=SASL_SSL://<broker_host_1_FQDN:9091>,<broker_host_2_FQDN:9091>,...,<broker_host_N_FQDN:9091>
      kafkastore.ssl.truststore.location=/etc/schema-registry/client.truststore.jks
      kafkastore.ssl.truststore.password=<secure_certificate_storage_password>
      kafkastore.sasl.mechanism=SCRAM-SHA-512
      kafkastore.security.protocol=SASL_SSL
      

      You can get a list of broker hosts with a list of cluster hosts.

  8. Edit the /lib/systemd/system/confluent-schema-registry.service file which describes the systemd module.

    1. Go to the [Service] section.

    2. Add the Environment parameter with Java settings:

      ...
      
      [Service]
      Type=simple
      User=cp-schema-registry
      Group=confluent
      Environment="LOG_DIR=/var/log/confluent/schema-registry"
      Environment="_JAVA_OPTIONS='-Djava.security.auth.login.config=/etc/schema-registry/jaas.conf'"
      ...
      
  9. Update the systemd module details:

    sudo systemctl daemon-reload
    
  10. Start the Confluent Schema Registry service:

    sudo systemctl start confluent-schema-registry.service
    
  11. Set Confluent Schema Registry to start automatically after a system reboot:

    sudo systemctl enable confluent-schema-registry.service
    

Create producer and consumer scriptsCreate producer and consumer scripts

These scripts send and receive messages in the messages topic as a key:value pair. This example shows the data format schemas in Avro format.

Note

Python scripts are provided for demonstration only. You can prepare and send data format schemas and the data itself by creating a similar script in another language.

  1. Install the required Python packages:

    sudo pip3 install avro confluent_kafka
    
  2. Create a Python script for the consumer.

    Here is how the script works:

    1. Connect to the messages topic and Confluent Schema Registry.
    2. Continuously read messages arriving in the messages topic.
    3. When receiving a message, request the required schemas from Confluent Schema Registry to parse the message.
    4. Parse the message binary data based on the key and value schemas and display the result.

    consumer.py

    #!/usr/bin/python3
    
    from confluent_kafka.avro import AvroConsumer
    from confluent_kafka.avro.serializer import SerializerError
    
    
    c = AvroConsumer(
        {
            "bootstrap.servers": ','.join([
                "<broker_host_1_FQDN>:9091",
                ...
                "<broker_host_N_FQDN>:9091",
            ]),
            "group.id": "avro-consumer",
            "security.protocol": "SASL_SSL",
            "ssl.ca.location": "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt",
            "sasl.mechanism": "SCRAM-SHA-512",
            "sasl.username": "user",
            "sasl.password": "<user_password>",
            "schema.registry.url": "http://<Confluent_Schema_Registry_server_FQDN_or_IP_address>:8081",
        }
    )
    
    c.subscribe(["messages"])
    
    while True:
        try:
            msg = c.poll(10)
    
        except SerializerError as e:
            print("Message deserialization failed for {}: {}".format(msg, e))
            break
    
        if msg is None:
            continue
    
        if msg.error():
            print("AvroConsumer error: {}".format(msg.error()))
            continue
    
        print(msg.value())
    
    c.close()
    
  3. Create a Python script for the producer.

    Here is how the script works:

    1. Connect to the schema registry and send the key and value data format schemas.
    2. Generate the key and value based on the schemas you sent.
    3. Send a message containing a key:value pair to the messages topic. The system will automatically add the schema versions to your message.

    producer.py

    #!/usr/bin/python3
    
    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer
    
    
    value_schema_str = """
    {
        "namespace": "my.test",
        "name": "value",
        "type": "record",
        "fields": [
            {
                "name": "name",
                "type": "string"
            }
        ]
    }
    """
    
    key_schema_str = """
    {
        "namespace": "my.test",
        "name": "key",
        "type": "record",
        "fields": [
            {
                "name": "name",
                "type": "string"
            }
        ]
    }
    """
    
    value_schema = avro.loads(value_schema_str)
    key_schema = avro.loads(key_schema_str)
    value = {"name": "Value"}
    key = {"name": "Key"}
    
    
    def delivery_report(err, msg):
        """Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush()."""
        if err is not None:
            print("Message delivery failed: {}".format(err))
        else:
            print("Message delivered to {} [{}]".format(msg.topic(), msg.partition()))
    
    
    avroProducer = AvroProducer(
        {
            "bootstrap.servers": ','.join([
                "<broker_host_1_FQDN>:9091",
                ...
                "<broker_host_N_FQDN>:9091",
            ]),
            "security.protocol": "SASL_SSL",
            "ssl.ca.location": "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt",
            "sasl.mechanism": "SCRAM-SHA-512",
            "sasl.username": "user",
            "sasl.password": "<user_password>",
            "on_delivery": delivery_report,
            "schema.registry.url": "http://<Schema_Registry_server_FQDN_or_IP_address>:8081",
        },
        default_key_schema=key_schema,
        default_value_schema=value_schema,
    )
    
    avroProducer.produce(topic="messages", key=key, value=value)
    avroProducer.flush()
    

Make sure Confluent Schema Registry works correctlyMake sure Confluent Schema Registry works correctly

  1. Start the consumer:

    python3 ./consumer.py
    
  2. In a separate terminal, start the producer:

    python3 ./producer.py
    
  3. Make sure the data sent by the producer is received and correctly interpreted by the consumer:

    {'name': 'Value'}
    

Delete the resources you createdDelete the resources you created

Delete the resources you no longer need to avoid paying for them:

  • Delete the Managed Service for Apache Kafka® cluster.
  • Delete the VM.
  • If you reserved public static IP addresses, release and delete them.

Was the article helpful?

Previous
Using Managed Schema Registry with Managed Service for Apache Kafka® via REST API
Next
Monitoring message loss in an Apache Kafka® topic
© 2026 Direct Cursus Technology L.L.C.