Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Практические руководства
    • Все руководства
    • Развертывание веб-интерфейса Apache Kafka®
    • Миграция БД из стороннего кластера Apache Kafka® в Managed Service for Apache Kafka®
    • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for YDB в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for Greenplum® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for MongoDB с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for MySQL® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for OpenSearch с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for PostgreSQL с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Data Streams с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for Apache Kafka® с помощью Data Transfer
    • Захват изменений YDB и поставка в YDS
    • Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®
      • Управление схемами данных в Managed Service for Apache Kafka®
      • Использование Managed Schema Registry с Managed Service for Apache Kafka®
      • Использование Managed Schema Registry с Managed Service for Apache Kafka® с помощью REST API
      • Использование Confluent Schema Registry с Managed Service for Apache Kafka®
    • Автоматизация задач Query с помощью Managed Service for Apache Airflow™
    • Отправка запросов к API Yandex Cloud через Yandex Cloud Python SDK
    • Настройка SMTP-сервера для отправки уведомлений по электронной почте
    • Добавление данных в БД ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® средствами ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® при помощи Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for ClickHouse® с помощью Data Transfer
    • Асинхронная репликация данных из PostgreSQL в ClickHouse®
    • Обмен данными между Managed Service for ClickHouse® и Yandex Data Processing
    • Настройка Managed Service for ClickHouse® для Graphite
    • Получение данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®
    • Получение данных из Managed Service for Apache Kafka® в ksqlDB
    • Получение данных из RabbitMQ в Managed Service for ClickHouse®
    • Сохранение потока данных Data Streams в Managed Service for ClickHouse®
    • Асинхронная репликация данных из Яндекс Метрика в ClickHouse® с помощью Data Transfer
    • Использование гибридного хранилища в Managed Service for ClickHouse®
    • Шардирование таблиц Managed Service for ClickHouse®
    • Перешардирование данных в кластере Managed Service for ClickHouse®
    • Загрузка данных из Яндекс Директ в витрину Managed Service for ClickHouse® с использованием Cloud Functions, Object Storage и Data Transfer
    • Загрузка данных из Object Storage в Managed Service for ClickHouse® с помощью Data Transfer
    • Миграция данных со сменой хранилища из Managed Service for OpenSearch в Managed Service for ClickHouse® с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Managed Service for ClickHouse® с помощью Data Transfer
    • Миграция базы данных из Google BigQuery в Managed Service for ClickHouse®
    • Настройка Cloud DNS для доступа к кластеру Managed Service for ClickHouse® из других облачных сетей
    • Миграция кластера Yandex Data Processing с HDFS в другую зону доступности
    • Импорт данных из Managed Service for MySQL® в Yandex Data Processing с помощью Sqoop
    • Импорт данных из Managed Service for PostgreSQL в Yandex Data Processing с помощью Sqoop
    • Монтирование бакетов Object Storage к файловой системе хостов Yandex Data Processing
    • Работа с топиками Apache Kafka® с помощью Yandex Data Processing
    • Автоматизация работы с Yandex Data Processing с помощью Managed Service for Apache Airflow™
    • Совместная работа с таблицами Yandex Data Processing с использованием Metastore
    • Перенос метаданных между кластерами Yandex Data Processing с помощью Metastore
    • Импорт данных из Object Storage, обработка и экспорт в Managed Service for ClickHouse®
    • Миграция в Managed Service for Elasticsearch с помощью снапшотов
    • Миграция коллекций из стороннего кластера MongoDB в Managed Service for MongoDB
    • Миграция данных в Managed Service for MongoDB
    • Миграция кластера Managed Service for MongoDB с версии 4.4 на 6.0
    • Шардирование коллекций MongoDB
    • Анализ производительности и оптимизация MongoDB
    • Миграция БД из стороннего кластера MySQL® в кластер Managed Service for MySQL®
    • Анализ производительности и оптимизация Managed Service for MySQL®
    • Синхронизация данных из стороннего кластера MySQL® в Managed Service for MySQL® с помощью Data Transfer
    • Миграция БД из Managed Service for MySQL® в сторонний кластер MySQL®
    • Миграция БД из Managed Service for MySQL® в Object Storage с помощью Data Transfer
    • Перенос данных из Object Storage в Managed Service for MySQL® с использованием Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Миграция БД из Managed Service for MySQL® в Managed Service for YDB с помощью Data Transfer
    • Захват изменений MySQL® и поставка в YDS
    • Миграция данных из Managed Service for MySQL® в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из AWS RDS for PostgreSQL в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из Managed Service for MySQL® в Managed Service for Greenplum® с помощью Data Transfer
    • Настройка политики индексов в Managed Service for OpenSearch
    • Миграция данных из Elasticsearch в Managed Service for OpenSearch
    • Миграция данных в Managed Service for OpenSearch из стороннего кластера OpenSearch с помощью Data Transfer
    • Загрузка данных из Managed Service for OpenSearch в Object Storage с помощью Data Transfer
    • Миграция данных из Managed Service for OpenSearch в Managed Service for YDB с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Аутентификация в OpenSearch Dashboards кластера Managed Service for OpenSearch с помощью Keycloak
    • Использование плагина yandex-lemmer в Managed Service for OpenSearch
    • Создание кластера PostgreSQL для «1С:Предприятия»
    • Поиск проблем с производительностью кластера Managed Service for PostgreSQL
    • Анализ производительности и оптимизация Managed Service for PostgreSQL
    • Логическая репликация PostgreSQL
    • Миграция БД из стороннего кластера PostgreSQL в Managed Service for PostgreSQL
    • Миграция БД из Managed Service for PostgreSQL
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for YDB с помощью Data Transfer
    • Миграция БД из Managed Service for PostgreSQL в Object Storage
    • Перенос данных из Object Storage в Managed Service for PostgreSQL с использованием Data Transfer
    • Захват изменений PostgreSQL и поставка в YDS
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for MySQL® с помощью Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Решение проблем с сортировкой строк в PostgreSQL после обновления glibc
    • Миграция БД из Greenplum® в ClickHouse®
    • Миграция БД из Greenplum® в PostgreSQL
    • Выгрузка данных Greenplum® в холодное хранилище Object Storage
    • Загрузка данных из Object Storage в Managed Service for Greenplum® с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Создание внешней таблицы на базе таблицы из бакета Object Storage с помощью конфигурационного файла
    • Миграция БД из стороннего кластера Valkey™ в Yandex Managed Service for Valkey™
    • Использование кластера Yandex Managed Service for Valkey™ в качестве хранилища сессий PHP
    • Загрузка данных из Object Storage в Managed Service for YDB с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Object Storage с помощью Data Transfer
    • Обработка аудитных логов Audit Trails
    • Обработка логов Cloud Logging
    • Обработка потока изменений Debezium
    • Анализ данных с помощью Jupyter
    • Обработка файлов детализации в сервисе Yandex Cloud Billing
    • Ввод данных в системы хранения
    • Умная обработка логов
    • Передача данных в микросервисных архитектурах
    • Миграция данных в Object Storage с помощью Data Transfer
    • Миграция данных из стороннего кластера Greenplum® или PostgreSQL в Managed Service for Greenplum® с помощью Data Transfer
    • Миграция кластера Managed Service for MongoDB
    • Миграция кластера MySQL®
    • Миграция на сторонний кластер MySQL®
    • Миграция кластера PostgreSQL
    • Создание реестра схем для поставки данных в формате Debezium CDC из Apache Kafka®

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте инфраструктуру
  • Установите утилиты
  • Создайте схемы формата данных
  • Отправьте сообщения в топик
  • Получите сообщения из топика
  • Удалите созданные ресурсы
  1. Построение Data Platform
  2. Использование схем формата данных с Managed Service for Apache Kafka®
  3. Использование Managed Schema Registry с Managed Service for Apache Kafka® с помощью REST API

Использование Managed Schema Registry с Yandex Managed Service for Apache Kafka® с помощью REST API

Статья создана
Yandex Cloud
Обновлена 8 апреля 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
    • Подготовьте инфраструктуру
    • Установите утилиты
  • Создайте схемы формата данных
  • Отправьте сообщения в топик
  • Получите сообщения из топика
  • Удалите созданные ресурсы

В кластерах Managed Service for Apache Kafka® можно работать с Managed Schema Registry либо с помощью клиентов Apache Kafka® для различных языков программирования, либо с помощью REST API.

Также Managed Service for Apache Kafka® предоставляет REST API для Apache Kafka®. В том числе, с помощью этого API можно отправлять и получать сообщения без использования сторонних производителей и потребителей. Эти возможности также будут продемонстрированы в этом практическом руководстве.

Чтобы познакомиться с возможностями REST API для Managed Schema Registry и Apache Kafka®:

  1. Создайте схемы формата данных.
  2. Отправьте сообщения в топик.
  3. Получите сообщения из топика.
  4. Удалите созданные ресурсы.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за использование публичных IP-адресов (см. тарифы Virtual Private Cloud).

Перед началом работыПеред началом работы

Подготовьте инфраструктуруПодготовьте инфраструктуру

Вручную
  1. Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации.

    При создании кластера включите опции:

    • Реестр схем данных.

      В кластере будет развернут реестр схем формата данных Managed Schema Registry и станет доступен REST API для Managed Schema Registry.

    • Kafka Rest API.

      В кластере станет доступен REST API для Apache Kafka®.

    • Публичный доступ.

      Хосты-брокеры станут доступны из интернета.

  2. Создайте топик с именем messages для обмена сообщениями между производителем и потребителем.

  3. Создайте пользователя с именем user1 и выдайте ему права на топик messages:

    • ACCESS_ROLE_CONSUMER,
    • ACCESS_ROLE_PRODUCER.

    Этот пользователь сможет отправлять и получать сообщения в рамках топика, а также выполнять любые операции над субъектами в Managed Schema Registry, которые связаны с топиком.

  4. Выполните все шаги по предварительной настройке для подключения к кластеру.

Установите утилитыУстановите утилиты

  1. Установите утилиту cURL:

    sudo apt install curl -y
    

    С ее помощью будут выполняться запросы к API.

    Для удобства в этом практическом руководстве при выполнении запросов к API будет использоваться опция cURL --user. При указанной опции cURL сам добавит в запрос HTTP-заголовок Authorization с необходимым значением для авторизации.

    Совет

    Вы можете самостоятельно сконструировать заголовок Authorization, например, если вы не используете cURL.

  2. Установите утилиту jq:

    sudo apt install jq -y
    

    С ее помощью описания схем будут приводиться к нужному формату.

    При использовании REST API для Managed Schema Registry описания схем необходимо передавать в виде строки с экранированными символами, например:

    "schema": "{\"type\": \"record\", \"name\": \"Obj\", \"fields\":[...]}"
    

    Для удобства в этом практическом руководстве схемы представлены в виде JSON-документов c отступами и переносами строк, а при выполнении запросов к API схемы приводятся к нужному формату с помощью jq.

    Совет

    После выполнения запроса к REST API с помощью cURL, ответ сервера представляется в виде одной JSON-строки.

    Вы можете дополнительно обработать вывод команд из этого практического руководства с помощью jq, чтобы сделать ответ сервера удобочитаемым.

Создайте схемы формата данныхСоздайте схемы формата данных

Примечание

В этом практическом руководстве используются схемы типа Avro.

Вы можете использовать другие типы схем, которые поддерживаются в Managed Schema Registry.

Пусть сообщение Apache Kafka® в топике messages должно состоять из ключа и значения в следующем формате:

Ключ

Значение

{
  "id": <int>,
  "sid": "<string>"
}
{
  "name": "<string>",
  "city": "<string>",
  "age": <int>
}

Создайте соответствующие схемы формата данных:

  1. Создайте файл schema-key.json, который содержит схему формата данных для ключа сообщения Apache Kafka®.

    schema-key.json
    {
      "type": "record",
      "name": "my_key",
      "fields": [
        {
          "name": "id",
          "type": "int"
        },
        {
          "name": "sid",
          "type": "string"
        }
      ]
    }
    
  2. Создайте схему формата данных для ключа сообщения Apache Kafka®.

    Имя субъекта для схемы должно состоять из имени топика, в котором будет использоваться эта схема (messages) и суффикса -key.

    Воспользуйтесь методом POST /subjects/(subject)/versions REST API для Managed Schema Registry и выполните запрос:

    jq \
        -n --slurpfile data schema-key.json \
        '{
           "schemaType": "AVRO",
           "schema": "\($data)"
        }' \
    | curl \
          --request POST \
          --url 'https://<FQDN_хоста-брокера>:443/subjects/messages-key/versions' \
          --user user1:<пароль_пользователя> \
          --header 'Content-Type: application/vnd.schemaregistry.v1+json' \
          --data "@-"
    

    В ответе на запрос будет возвращен идентификатор созданной схемы, например, {"id":1}.

  3. Создайте файл schema-value.json, который содержит схему формата данных для значения сообщения Apache Kafka®.

    schema-value.json
    {
      "type": "record",
      "name": "my_value",
      "fields": [
        {
          "name": "name",
          "type": "string"
        },
        {
          "name": "city",
          "type": "string"
        },
        {
          "name": "age",
          "type": "int"
        }
      ]
    }
    
  4. Создайте схему формата данных для значения сообщения Apache Kafka®.

    Имя субъекта для схемы должно состоять из имени топика, в котором будет использоваться эта схема (messages) и суффикса -value.

    Воспользуйтесь методом POST /subjects/(subject)/versions REST API для Managed Schema Registry и выполните запрос:

    jq \
        -n --slurpfile data schema-value.json \
        '{
           "schemaType": "AVRO",
           "schema": "\($data)"
        }' \
    | curl \
          --request POST \
          --url 'https://<FQDN_хоста-брокера>:443/subjects/messages-value/versions' \
          --user user1:<пароль_пользователя> \
          --header 'Content-Type: application/vnd.schemaregistry.v1+json' \
          --data "@-"
    

    В ответе на запрос будет возвращен идентификатор созданной схемы, например, {"id":2}.

Отправьте сообщения в топикОтправьте сообщения в топик

  1. Получите идентификаторы схем формата данных для ключа и значения.

    Воспользуйтесь методом GET /schemas REST API для Managed Schema Registry и выполните запрос:

    curl \
        --request GET \
        --url 'https://<FQDN_хоста-брокера>:443/schemas' \
        --user user1:<пароль_пользователя> \
        --header 'Accept: application/vnd.schemaregistry.v1+json'
    

    В ответе на запрос содержатся идентификаторы схем формата данных (id). Эти идентификаторы будут использоваться позднее.

    Пример ответа на запрос

    Для краткости схема формата данных schema в виде JSON-строк не приводится.

    [
      {
        "id": 1,
        "schema": "<схема_формата_данных>",
        "schemaType": "AVRO",
        "subject": "messages-key",
        "version": 1
      },
      {
        "id": 2,
        "schema": "<схема_формата_данных>",
        "schemaType": "AVRO",
        "subject": "messages-value",
        "version": 1
      }
    ]
    
  2. Создайте файл message-list.json, который содержит два сообщения. Для каждого сообщения указаны ключ и значение в соответствии с созданными ранее схемами формата данных.

    message-list.json
    [
      {
        "key": {
          "id": 1111,
          "sid": "AAAAA-BBBBB-CCCCC"
        },
        "value": {
          "name": "Anna",
          "city": "Moscow",
          "age": 44
        }
      },
      {
        "key": {
          "id": 2222,
          "sid": "DDDDD-EEEEE-FFFFF"
        },
        "value": {
          "name": "Alex",
          "city": "London",
          "age": 32
        }
      }
    ]
    
  3. Отправьте сообщения в топик messages.

    Воспользуйтесь методом POST /topics/(topic) REST API для Apache Kafka® и выполните запрос:

    jq \
        -n --slurpfile data message-list.json \
        '{
          "key_schema_id": <идентификатор_схемы_messages-key>,
          "value_schema_id": <идентификатор_схемы_messages-value>,
          "records": $data.[]
        }' \
    | curl \
          --request POST \
          --url 'https://<FQDN_хоста-брокера>:443/topics/messages' \
          --user user1:<пароль_пользователя> \
          --header 'Content-Type: application/vnd.kafka.avro.v2+json' \
          --header 'Accept: application/vnd.kafka.v2+json' \
          --data "@-"
    

    Значения идентификаторов схем были получены ранее с помощью запроса к эндпоинту GET /schemas.

    Пример ответа на запрос
    {
      "key_schema_id": 1,
      "offsets": [
        {
          "offset": 0,
          "partition": 0
        },
        {
          "offset": 0,
          "partition": 1
        }
      ],
      "value_schema_id": 2
    }
    

Получите сообщения из топикаПолучите сообщения из топика

  1. Создайте потребителя my-consumer в группе потребителей my-group.

    Воспользуйтесь методом POST /consumers/(group) REST API для Apache Kafka® и выполните запрос:

    curl \
        --request POST \
        --url 'https://<FQDN_хоста-брокера>:443/consumers/my-group' \
        --user user1:<пароль_пользователя> \
        --header 'Content-Type: application/vnd.kafka.v2+json' \
        --header 'Accept: application/vnd.kafka.v2+json' \
        --data '{
                  "name": "my-consumer",
                  "format": "avro",
                  "auto.offset.reset": "earliest"
                }'
    
    Пример ответа на запрос
    {
      "base_uri": "https://<FQDN_хоста-брокера>:443/consumers/my-group/instances/my-consumer",
      "instance_id": "my-consumer"
    }
    
  2. Подпишитесь на топик messages для потребителя my-consumer из группы потребителей my-group.

    Воспользуйтесь методом POST /consumers/(group)/instances/(instance)/subscription REST API для Apache Kafka® и выполните запрос:

    curl \
        --request POST \
        --url 'https://<FQDN_хоста-брокера>:443/consumers/my-group/instances/my-consumer/subscription' \
        --user user1:<пароль_пользователя> \
        --header 'Content-Type: application/vnd.kafka.v2+json' \
        --header 'Accept: application/vnd.kafka.v2+json' \
        --data '{"topics": ["messages"]}'
    

    Сервер API не возвращает ответа на этот запрос, только HTTP-статус.

  3. Получите все сообщения из топика messages для потребителя my-consumer из группы потребителей my-group.

    Воспользуйтесь методом GET /consumers/(group)/instances/(instance)/records REST API для Apache Kafka® и выполните запрос:

    curl \
        --request GET \
        --url 'https://<FQDN_хоста-брокера>:443/consumers/my-group/instances/my-consumer/records' \
        --user user1:<пароль_пользователя> \
        --header 'Accept: application/vnd.kafka.avro.v2+json'
    

    Если в ответе на запрос содержатся отправленные ранее сообщения, то это значит, что производитель и потребитель успешно интерпретируют сообщения в соответствии с заданными схемами формата данных.

    Пример ответа на запрос
    [
      {
        "key": {
          "id": 2222,
          "sid": "DDDDD-EEEEE-FFFFF"
        },
        "offset": 0,
        "partition": 1,
        "timestamp": 1726031054186,
        "topic": "messages",
        "value": {
          "age": 32,
          "city": "London",
          "name": "Alex"
        }
      },
      {
        "key": {
          "id": 1111,
          "sid": "AAAAA-BBBBB-CCCCC"
        },
        "offset": 0,
        "partition": 0,
        "timestamp": 1726031054186,
        "topic": "messages",
        "value": {
          "age": 44,
          "city": "Moscow",
          "name": "Anna"
        }
      }
    ]
    

Удалите созданные ресурсыУдалите созданные ресурсы

Удалите ресурсы, которые вы больше не будете использовать, чтобы за них не списывалась плата:

  • Удалите кластер Managed Service for Apache Kafka®.
  • Если вы зарезервировали публичные статические IP-адреса, освободите и удалите их.

Была ли статья полезна?

Предыдущая
Использование Managed Schema Registry с Managed Service for Apache Kafka®
Следующая
Использование Confluent Schema Registry с Managed Service for Apache Kafka®
Проект Яндекса
© 2025 ООО «Яндекс.Облако»