Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Практические руководства
    • Все руководства
    • Развертывание веб-интерфейса Apache Kafka®
    • Миграция БД из стороннего кластера Apache Kafka® в Managed Service for Apache Kafka®
    • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for YDB в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for Greenplum® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for MongoDB с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for MySQL® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for OpenSearch с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for PostgreSQL с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Data Streams с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for Apache Kafka® с помощью Data Transfer
    • Захват изменений YDB и поставка в YDS
    • Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®
      • Управление схемами данных в Managed Service for Apache Kafka®
      • Использование Managed Schema Registry с Managed Service for Apache Kafka®
      • Использование Managed Schema Registry с Managed Service for Apache Kafka® с помощью REST API
      • Использование Confluent Schema Registry с Managed Service for Apache Kafka®
    • Автоматизация задач Query с помощью Managed Service for Apache Airflow™
    • Отправка запросов к API Yandex Cloud через Yandex Cloud Python SDK
    • Настройка SMTP-сервера для отправки уведомлений по электронной почте
    • Добавление данных в БД ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® средствами ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® при помощи Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for ClickHouse® с помощью Data Transfer
    • Асинхронная репликация данных из PostgreSQL в ClickHouse®
    • Обмен данными между Managed Service for ClickHouse® и Yandex Data Processing
    • Настройка Managed Service for ClickHouse® для Graphite
    • Получение данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®
    • Получение данных из Managed Service for Apache Kafka® в ksqlDB
    • Получение данных из RabbitMQ в Managed Service for ClickHouse®
    • Сохранение потока данных Data Streams в Managed Service for ClickHouse®
    • Асинхронная репликация данных из Яндекс Метрика в ClickHouse® с помощью Data Transfer
    • Использование гибридного хранилища в Managed Service for ClickHouse®
    • Шардирование таблиц Managed Service for ClickHouse®
    • Перешардирование данных в кластере Managed Service for ClickHouse®
    • Загрузка данных из Яндекс Директ в витрину Managed Service for ClickHouse® с использованием Cloud Functions, Object Storage и Data Transfer
    • Загрузка данных из Object Storage в Managed Service for ClickHouse® с помощью Data Transfer
    • Миграция данных со сменой хранилища из Managed Service for OpenSearch в Managed Service for ClickHouse® с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Managed Service for ClickHouse® с помощью Data Transfer
    • Миграция базы данных из Google BigQuery в Managed Service for ClickHouse®
    • Настройка Cloud DNS для доступа к кластеру Managed Service for ClickHouse® из других облачных сетей
    • Миграция кластера Yandex Data Processing с HDFS в другую зону доступности
    • Импорт данных из Managed Service for MySQL® в Yandex Data Processing с помощью Sqoop
    • Импорт данных из Managed Service for PostgreSQL в Yandex Data Processing с помощью Sqoop
    • Монтирование бакетов Object Storage к файловой системе хостов Yandex Data Processing
    • Работа с топиками Apache Kafka® с помощью Yandex Data Processing
    • Автоматизация работы с Yandex Data Processing с помощью Managed Service for Apache Airflow™
    • Совместная работа с таблицами Yandex Data Processing с использованием Metastore
    • Перенос метаданных между кластерами Yandex Data Processing с помощью Metastore
    • Импорт данных из Object Storage, обработка и экспорт в Managed Service for ClickHouse®
    • Миграция в Managed Service for Elasticsearch с помощью снапшотов
    • Миграция коллекций из стороннего кластера MongoDB в Managed Service for MongoDB
    • Миграция данных в Managed Service for MongoDB
    • Миграция кластера Managed Service for MongoDB с версии 4.4 на 6.0
    • Шардирование коллекций MongoDB
    • Анализ производительности и оптимизация MongoDB
    • Миграция БД из стороннего кластера MySQL® в кластер Managed Service for MySQL®
    • Анализ производительности и оптимизация Managed Service for MySQL®
    • Синхронизация данных из стороннего кластера MySQL® в Managed Service for MySQL® с помощью Data Transfer
    • Миграция БД из Managed Service for MySQL® в сторонний кластер MySQL®
    • Миграция БД из Managed Service for MySQL® в Object Storage с помощью Data Transfer
    • Перенос данных из Object Storage в Managed Service for MySQL® с использованием Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Миграция БД из Managed Service for MySQL® в Managed Service for YDB с помощью Data Transfer
    • Захват изменений MySQL® и поставка в YDS
    • Миграция данных из Managed Service for MySQL® в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из AWS RDS for PostgreSQL в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из Managed Service for MySQL® в Managed Service for Greenplum® с помощью Data Transfer
    • Настройка политики индексов в Managed Service for OpenSearch
    • Миграция данных из Elasticsearch в Managed Service for OpenSearch
    • Миграция данных в Managed Service for OpenSearch из стороннего кластера OpenSearch с помощью Data Transfer
    • Загрузка данных из Managed Service for OpenSearch в Object Storage с помощью Data Transfer
    • Миграция данных из Managed Service for OpenSearch в Managed Service for YDB с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Аутентификация в OpenSearch Dashboards кластера Managed Service for OpenSearch с помощью Keycloak
    • Использование плагина yandex-lemmer в Managed Service for OpenSearch
    • Создание кластера PostgreSQL для «1С:Предприятия»
    • Поиск проблем с производительностью кластера Managed Service for PostgreSQL
    • Анализ производительности и оптимизация Managed Service for PostgreSQL
    • Логическая репликация PostgreSQL
    • Миграция БД из стороннего кластера PostgreSQL в Managed Service for PostgreSQL
    • Миграция БД из Managed Service for PostgreSQL
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for YDB с помощью Data Transfer
    • Миграция БД из Managed Service for PostgreSQL в Object Storage
    • Перенос данных из Object Storage в Managed Service for PostgreSQL с использованием Data Transfer
    • Захват изменений PostgreSQL и поставка в YDS
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for MySQL® с помощью Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Решение проблем с сортировкой строк в PostgreSQL после обновления glibc
    • Миграция БД из Greenplum® в ClickHouse®
    • Миграция БД из Greenplum® в PostgreSQL
    • Выгрузка данных Greenplum® в холодное хранилище Object Storage
    • Загрузка данных из Object Storage в Managed Service for Greenplum® с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Создание внешней таблицы на базе таблицы из бакета Object Storage с помощью конфигурационного файла
    • Миграция БД из стороннего кластера Valkey™ в Yandex Managed Service for Valkey™
    • Использование кластера Yandex Managed Service for Valkey™ в качестве хранилища сессий PHP
    • Загрузка данных из Object Storage в Managed Service for YDB с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Object Storage с помощью Data Transfer
    • Обработка аудитных логов Audit Trails
    • Обработка логов Cloud Logging
    • Обработка потока изменений Debezium
    • Анализ данных с помощью Jupyter
    • Обработка файлов детализации в сервисе Yandex Cloud Billing
    • Ввод данных в системы хранения
    • Умная обработка логов
    • Передача данных в микросервисных архитектурах
    • Миграция данных в Object Storage с помощью Data Transfer
    • Миграция данных из стороннего кластера Greenplum® или PostgreSQL в Managed Service for Greenplum® с помощью Data Transfer
    • Миграция кластера Managed Service for MongoDB
    • Миграция кластера MySQL®
    • Миграция на сторонний кластер MySQL®
    • Миграция кластера PostgreSQL
    • Создание реестра схем для поставки данных в формате Debezium CDC из Apache Kafka®

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Создайте скрипты производителя и потребителя
  • Проверьте правильность работы Managed Schema Registry
  • Удалите созданные ресурсы
  1. Построение Data Platform
  2. Использование схем формата данных с Managed Service for Apache Kafka®
  3. Использование Managed Schema Registry с Managed Service for Apache Kafka®

Использование Managed Schema Registry с Yandex Managed Service for Apache Kafka®

Статья создана
Yandex Cloud
Обновлена 10 марта 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Создайте скрипты производителя и потребителя
  • Проверьте правильность работы Managed Schema Registry
  • Удалите созданные ресурсы

Чтобы использовать Managed Schema Registry совместно с Managed Service for Apache Kafka®:

  1. Создайте скрипты производителя и потребителя на локальной машине.
  2. Проверьте правильность работы Managed Schema Registry.
  3. Удалите созданные ресурсы.

В этом руководстве описана регистрация одной схемы данных. Подробнее о том, как зарегистрировать несколько схем данных, см. в документации Confluent Schema Registry.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки инфраструктуры входит:

  • плата за вычислительные ресурсы кластера Managed Service for Apache Kafka® и объем хранилища (см. тарифы Managed Service for Apache Kafka®);
  • плата за вычислительные ресурсы и диски ВМ (см. тарифы Yandex Compute Cloud);
  • плата за использование публичного IP-адреса (см. тарифы Yandex Virtual Private Cloud).

Перед началом работыПеред началом работы

  1. Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации. При создании кластера включите опции Реестр схем данных и Публичный доступ.

    1. Создайте топик с именем messages для обмена сообщениями между производителем и потребителем.
    2. Создайте пользователя с именем user и выдайте ему права на топик messages:
      • ACCESS_ROLE_CONSUMER,
      • ACCESS_ROLE_PRODUCER.
  2. В той же сети, что и кластер Managed Service for Apache Kafka®, создайте виртуальную машину с Ubuntu 20.04 и публичным IP-адресом.

  3. Если вы используете группы безопасности, настройте их так, чтобы был разрешен весь необходимый трафик между кластером Managed Service for Apache Kafka® и виртуальной машиной.

Создайте скрипты производителя и потребителяСоздайте скрипты производителя и потребителя

Приведенные скрипты отправляют и принимают сообщения в топике messages в виде пары ключ:значение. В качестве примера схемы форматов данных описаны в формате Avro.

Примечание

Скрипты на Python приведены в демонстрационных целях. Вы можете подготовить и передать схемы форматов данных и сами данные, создав аналогичный скрипт на другом языке программирования.

  1. Подключитесь к виртуальной машине по SSH.

  2. Установите необходимые пакеты Python:

    sudo apt-get update && \
    sudo pip3 install avro confluent_kafka
    
  3. Чтобы использовать шифрованное соединение, установите SSL-сертификат:

    sudo mkdir -p /usr/share/ca-certificates && \
    sudo wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \
              -O /usr/share/ca-certificates/YandexInternalRootCA.crt && \
    sudo chmod 655 /usr/share/ca-certificates/YandexInternalRootCA.crt
    
  4. Создайте Python-скрипт потребителя.

    Алгоритм работы скрипта:

    1. Подключиться к топику messages и реестру схем Confluent Schema Registry.
    2. В непрерывном цикле считывать поступающие в топик messages сообщения.
    3. При получении сообщения запросить в реестре схем Confluent Schema Registry нужные схемы для разбора сообщения.
    4. Разобрать бинарные данные из сообщения в соответствии со схемами для ключа и значения и вывести результат на экран.

    consumer.py

    #!/usr/bin/python3
    
    from confluent_kafka.avro import AvroConsumer
    from confluent_kafka.avro.serializer import SerializerError
    
    
    c = AvroConsumer(
        {
            "bootstrap.servers": ','.join([
            "<FQDN_хоста-брокера_1>:9091",
            ...
            "<FQDN_хоста-брокера_N>:9091",
            ]),
            "group.id": "avro-consumer",
            "security.protocol": "SASL_SSL",
            "ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt",
            "sasl.mechanism": "SCRAM-SHA-512",
            "sasl.username": "user",
            "sasl.password": "<пароль_пользователя_user>",
            "schema.registry.url": "https://<FQDN_или_IP-адрес_сервера_Managed_Schema_Registry>:443",
            "schema.registry.basic.auth.credentials.source": "SASL_INHERIT",
            "schema.registry.ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt",
            "auto.offset.reset": "earliest"
        }
    )
    
    c.subscribe(["messages"])
    
    while True:
        try:
            msg = c.poll(10)
    
        except SerializerError as e:
            print("Message deserialization failed for {}: {}".format(msg, e))
            break
    
        if msg is None:
            continue
    
        if msg.error():
            print("AvroConsumer error: {}".format(msg.error()))
            continue
    
        print(msg.value())
    
    c.close()
    
  5. Создайте Python-скрипт производителя.

    Алгоритм работы скрипта:

    1. Подключиться к реестру схем и передать ему схемы форматов данных для ключа и значения.
    2. Сформировать на основе переданных схем ключ и значение.
    3. Отправить в топик messages сообщение, состоящее из пары ключ:значение. Номера версий схем будут добавлены в сообщение автоматически.

    producer.py

    #!/usr/bin/python3
    
    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer
    
    
    value_schema_str = """
    {
        "namespace": "my.test",
        "name": "value",
        "type": "record",
        "fields": [
            {
                "name": "name",
                "type": "string"
            }
        ]
    }
    """
    
    key_schema_str = """
    {
        "namespace": "my.test",
        "name": "key",
        "type": "record",
        "fields": [
            {
                "name": "name",
                "type": "string"
            }
        ]
    }
    """
    
    value_schema = avro.loads(value_schema_str)
    key_schema = avro.loads(key_schema_str)
    value = {"name": "Value"}
    key = {"name": "Key"}
    
    
    def delivery_report(err, msg):
        """Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush()."""
        if err is not None:
            print("Message delivery failed: {}".format(err))
        else:
            print("Message delivered to {} [{}]".format(msg.topic(), msg.partition()))
    
    
    avroProducer = AvroProducer(
        {
            "bootstrap.servers": ','.join([
                "<FQDN_хоста-брокера_1>:9091",
                ...
                "<FQDN_хоста-брокера_N>:9091",
            ]),
            "security.protocol": 'SASL_SSL',
            "ssl.ca.location": '/usr/share/ca-certificates/YandexInternalRootCA.crt',
            "sasl.mechanism": 'SCRAM-SHA-512',
            "sasl.username": 'user',
            "sasl.password": '<пароль_пользователя_user>',
            "on_delivery": delivery_report,
            "schema.registry.basic.auth.credentials.source": 'SASL_INHERIT',
            "schema.registry.url": 'https://<FQDN_или_IP-адрес_сервера_Managed_Schema_Registry>:443',
            "schema.registry.ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt"
        },
        default_key_schema=key_schema,
        default_value_schema=value_schema
    )
    
    avroProducer.produce(topic="messages", key=key, value=value)
    avroProducer.flush()
    

Проверьте правильность работы Managed Schema RegistryПроверьте правильность работы Managed Schema Registry

  1. Запустите потребитель:

    python3 ./consumer.py
    
  2. В отдельном терминале запустите производитель:

    python3 ./producer.py
    
  3. Убедитесь, что данные, отправленные производителем, получены и правильно интерпретированы потребителем:

    {'name': 'Value'}
    

Удалите созданные ресурсыУдалите созданные ресурсы

Удалите ресурсы, которые вы больше не будете использовать, чтобы за них не списывалась плата:

  • Удалите кластер Managed Service for Apache Kafka®.
  • Удалите виртуальную машину.
  • Если вы зарезервировали публичные статические IP-адреса, освободите и удалите их

Была ли статья полезна?

Предыдущая
Управление схемами данных в Managed Service for Apache Kafka®
Следующая
Использование Managed Schema Registry с Managed Service for Apache Kafka® с помощью REST API
Проект Яндекса
© 2025 ООО «Яндекс.Облако»