Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Managed Service for Apache Kafka®
  • Начало работы
    • Все руководства
    • Развертывание веб-интерфейса Apache Kafka®
      • Управление схемами данных в Managed Service for Apache Kafka®
      • Работа с управляемым реестром схем формата данных
      • Работа с управляемым реестром схем формата данных с помощью REST API
      • Использование Confluent Schema Registry с Managed Service for Apache Kafka®
    • Работа с топиками Apache Kafka® с помощью Yandex Data Processing
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Yandex Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • История изменений
  • Вопросы и ответы
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте инфраструктуру
  • Установите утилиты
  • Создайте схемы формата данных
  • Отправьте сообщения в топик
  • Получите сообщения из топика
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Использование схем формата данных с Managed Service for Apache Kafka®
  3. Работа с управляемым реестром схем формата данных с помощью REST API

Работа с управляемым реестром схем формата данных с помощью REST API

Статья создана
Yandex Cloud
Обновлена 8 апреля 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
    • Подготовьте инфраструктуру
    • Установите утилиты
  • Создайте схемы формата данных
  • Отправьте сообщения в топик
  • Получите сообщения из топика
  • Удалите созданные ресурсы

В кластерах Managed Service for Apache Kafka® можно работать с Managed Schema Registry либо с помощью клиентов Apache Kafka® для различных языков программирования, либо с помощью REST API.

Также Managed Service for Apache Kafka® предоставляет REST API для Apache Kafka®. В том числе, с помощью этого API можно отправлять и получать сообщения без использования сторонних производителей и потребителей. Эти возможности также будут продемонстрированы в этом практическом руководстве.

Чтобы познакомиться с возможностями REST API для Managed Schema Registry и Apache Kafka®:

  1. Создайте схемы формата данных.
  2. Отправьте сообщения в топик.
  3. Получите сообщения из топика.
  4. Удалите созданные ресурсы.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за использование публичных IP-адресов (см. тарифы Virtual Private Cloud).

Перед началом работыПеред началом работы

Подготовьте инфраструктуруПодготовьте инфраструктуру

Вручную
  1. Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации.

    При создании кластера включите опции:

    • Реестр схем данных.

      В кластере будет развернут реестр схем формата данных Managed Schema Registry и станет доступен REST API для Managed Schema Registry.

    • Kafka Rest API.

      В кластере станет доступен REST API для Apache Kafka®.

    • Публичный доступ.

      Хосты-брокеры станут доступны из интернета.

  2. Создайте топик с именем messages для обмена сообщениями между производителем и потребителем.

  3. Создайте пользователя с именем user1 и выдайте ему права на топик messages:

    • ACCESS_ROLE_CONSUMER,
    • ACCESS_ROLE_PRODUCER.

    Этот пользователь сможет отправлять и получать сообщения в рамках топика, а также выполнять любые операции над субъектами в Managed Schema Registry, которые связаны с топиком.

  4. Выполните все шаги по предварительной настройке для подключения к кластеру.

Установите утилитыУстановите утилиты

  1. Установите утилиту cURL:

    sudo apt install curl -y
    

    С ее помощью будут выполняться запросы к API.

    Для удобства в этом практическом руководстве при выполнении запросов к API будет использоваться опция cURL --user. При указанной опции cURL сам добавит в запрос HTTP-заголовок Authorization с необходимым значением для авторизации.

    Совет

    Вы можете самостоятельно сконструировать заголовок Authorization, например, если вы не используете cURL.

  2. Установите утилиту jq:

    sudo apt install jq -y
    

    С ее помощью описания схем будут приводиться к нужному формату.

    При использовании REST API для Managed Schema Registry описания схем необходимо передавать в виде строки с экранированными символами, например:

    "schema": "{\"type\": \"record\", \"name\": \"Obj\", \"fields\":[...]}"
    

    Для удобства в этом практическом руководстве схемы представлены в виде JSON-документов c отступами и переносами строк, а при выполнении запросов к API схемы приводятся к нужному формату с помощью jq.

    Совет

    После выполнения запроса к REST API с помощью cURL, ответ сервера представляется в виде одной JSON-строки.

    Вы можете дополнительно обработать вывод команд из этого практического руководства с помощью jq, чтобы сделать ответ сервера удобочитаемым.

Создайте схемы формата данныхСоздайте схемы формата данных

Примечание

В этом практическом руководстве используются схемы типа Avro.

Вы можете использовать другие типы схем, которые поддерживаются в Managed Schema Registry.

Пусть сообщение Apache Kafka® в топике messages должно состоять из ключа и значения в следующем формате:

Ключ

Значение

{
  "id": <int>,
  "sid": "<string>"
}
{
  "name": "<string>",
  "city": "<string>",
  "age": <int>
}

Создайте соответствующие схемы формата данных:

  1. Создайте файл schema-key.json, который содержит схему формата данных для ключа сообщения Apache Kafka®.

    schema-key.json
    {
      "type": "record",
      "name": "my_key",
      "fields": [
        {
          "name": "id",
          "type": "int"
        },
        {
          "name": "sid",
          "type": "string"
        }
      ]
    }
    
  2. Создайте схему формата данных для ключа сообщения Apache Kafka®.

    Имя субъекта для схемы должно состоять из имени топика, в котором будет использоваться эта схема (messages) и суффикса -key.

    Воспользуйтесь методом POST /subjects/(subject)/versions REST API для Managed Schema Registry и выполните запрос:

    jq \
        -n --slurpfile data schema-key.json \
        '{
           "schemaType": "AVRO",
           "schema": "\($data)"
        }' \
    | curl \
          --request POST \
          --url 'https://<FQDN_хоста-брокера>:443/subjects/messages-key/versions' \
          --user user1:<пароль_пользователя> \
          --header 'Content-Type: application/vnd.schemaregistry.v1+json' \
          --data "@-"
    

    В ответе на запрос будет возвращен идентификатор созданной схемы, например, {"id":1}.

  3. Создайте файл schema-value.json, который содержит схему формата данных для значения сообщения Apache Kafka®.

    schema-value.json
    {
      "type": "record",
      "name": "my_value",
      "fields": [
        {
          "name": "name",
          "type": "string"
        },
        {
          "name": "city",
          "type": "string"
        },
        {
          "name": "age",
          "type": "int"
        }
      ]
    }
    
  4. Создайте схему формата данных для значения сообщения Apache Kafka®.

    Имя субъекта для схемы должно состоять из имени топика, в котором будет использоваться эта схема (messages) и суффикса -value.

    Воспользуйтесь методом POST /subjects/(subject)/versions REST API для Managed Schema Registry и выполните запрос:

    jq \
        -n --slurpfile data schema-value.json \
        '{
           "schemaType": "AVRO",
           "schema": "\($data)"
        }' \
    | curl \
          --request POST \
          --url 'https://<FQDN_хоста-брокера>:443/subjects/messages-value/versions' \
          --user user1:<пароль_пользователя> \
          --header 'Content-Type: application/vnd.schemaregistry.v1+json' \
          --data "@-"
    

    В ответе на запрос будет возвращен идентификатор созданной схемы, например, {"id":2}.

Отправьте сообщения в топикОтправьте сообщения в топик

  1. Получите идентификаторы схем формата данных для ключа и значения.

    Воспользуйтесь методом GET /schemas REST API для Managed Schema Registry и выполните запрос:

    curl \
        --request GET \
        --url 'https://<FQDN_хоста-брокера>:443/schemas' \
        --user user1:<пароль_пользователя> \
        --header 'Accept: application/vnd.schemaregistry.v1+json'
    

    В ответе на запрос содержатся идентификаторы схем формата данных (id). Эти идентификаторы будут использоваться позднее.

    Пример ответа на запрос

    Для краткости схема формата данных schema в виде JSON-строк не приводится.

    [
      {
        "id": 1,
        "schema": "<схема_формата_данных>",
        "schemaType": "AVRO",
        "subject": "messages-key",
        "version": 1
      },
      {
        "id": 2,
        "schema": "<схема_формата_данных>",
        "schemaType": "AVRO",
        "subject": "messages-value",
        "version": 1
      }
    ]
    
  2. Создайте файл message-list.json, который содержит два сообщения. Для каждого сообщения указаны ключ и значение в соответствии с созданными ранее схемами формата данных.

    message-list.json
    [
      {
        "key": {
          "id": 1111,
          "sid": "AAAAA-BBBBB-CCCCC"
        },
        "value": {
          "name": "Anna",
          "city": "Moscow",
          "age": 44
        }
      },
      {
        "key": {
          "id": 2222,
          "sid": "DDDDD-EEEEE-FFFFF"
        },
        "value": {
          "name": "Alex",
          "city": "London",
          "age": 32
        }
      }
    ]
    
  3. Отправьте сообщения в топик messages.

    Воспользуйтесь методом POST /topics/(topic) REST API для Apache Kafka® и выполните запрос:

    jq \
        -n --slurpfile data message-list.json \
        '{
          "key_schema_id": <идентификатор_схемы_messages-key>,
          "value_schema_id": <идентификатор_схемы_messages-value>,
          "records": $data.[]
        }' \
    | curl \
          --request POST \
          --url 'https://<FQDN_хоста-брокера>:443/topics/messages' \
          --user user1:<пароль_пользователя> \
          --header 'Content-Type: application/vnd.kafka.avro.v2+json' \
          --header 'Accept: application/vnd.kafka.v2+json' \
          --data "@-"
    

    Значения идентификаторов схем были получены ранее с помощью запроса к эндпоинту GET /schemas.

    Пример ответа на запрос
    {
      "key_schema_id": 1,
      "offsets": [
        {
          "offset": 0,
          "partition": 0
        },
        {
          "offset": 0,
          "partition": 1
        }
      ],
      "value_schema_id": 2
    }
    

Получите сообщения из топикаПолучите сообщения из топика

  1. Создайте потребителя my-consumer в группе потребителей my-group.

    Воспользуйтесь методом POST /consumers/(group) REST API для Apache Kafka® и выполните запрос:

    curl \
        --request POST \
        --url 'https://<FQDN_хоста-брокера>:443/consumers/my-group' \
        --user user1:<пароль_пользователя> \
        --header 'Content-Type: application/vnd.kafka.v2+json' \
        --header 'Accept: application/vnd.kafka.v2+json' \
        --data '{
                  "name": "my-consumer",
                  "format": "avro",
                  "auto.offset.reset": "earliest"
                }'
    
    Пример ответа на запрос
    {
      "base_uri": "https://<FQDN_хоста-брокера>:443/consumers/my-group/instances/my-consumer",
      "instance_id": "my-consumer"
    }
    
  2. Подпишитесь на топик messages для потребителя my-consumer из группы потребителей my-group.

    Воспользуйтесь методом POST /consumers/(group)/instances/(instance)/subscription REST API для Apache Kafka® и выполните запрос:

    curl \
        --request POST \
        --url 'https://<FQDN_хоста-брокера>:443/consumers/my-group/instances/my-consumer/subscription' \
        --user user1:<пароль_пользователя> \
        --header 'Content-Type: application/vnd.kafka.v2+json' \
        --header 'Accept: application/vnd.kafka.v2+json' \
        --data '{"topics": ["messages"]}'
    

    Сервер API не возвращает ответа на этот запрос, только HTTP-статус.

  3. Получите все сообщения из топика messages для потребителя my-consumer из группы потребителей my-group.

    Воспользуйтесь методом GET /consumers/(group)/instances/(instance)/records REST API для Apache Kafka® и выполните запрос:

    curl \
        --request GET \
        --url 'https://<FQDN_хоста-брокера>:443/consumers/my-group/instances/my-consumer/records' \
        --user user1:<пароль_пользователя> \
        --header 'Accept: application/vnd.kafka.avro.v2+json'
    

    Если в ответе на запрос содержатся отправленные ранее сообщения, то это значит, что производитель и потребитель успешно интерпретируют сообщения в соответствии с заданными схемами формата данных.

    Пример ответа на запрос
    [
      {
        "key": {
          "id": 2222,
          "sid": "DDDDD-EEEEE-FFFFF"
        },
        "offset": 0,
        "partition": 1,
        "timestamp": 1726031054186,
        "topic": "messages",
        "value": {
          "age": 32,
          "city": "London",
          "name": "Alex"
        }
      },
      {
        "key": {
          "id": 1111,
          "sid": "AAAAA-BBBBB-CCCCC"
        },
        "offset": 0,
        "partition": 0,
        "timestamp": 1726031054186,
        "topic": "messages",
        "value": {
          "age": 44,
          "city": "Moscow",
          "name": "Anna"
        }
      }
    ]
    

Удалите созданные ресурсыУдалите созданные ресурсы

Удалите ресурсы, которые вы больше не будете использовать, чтобы за них не списывалась плата:

  • Удалите кластер Managed Service for Apache Kafka®.
  • Если вы зарезервировали публичные статические IP-адреса, освободите и удалите их.

Была ли статья полезна?

Предыдущая
Работа с управляемым реестром схем формата данных
Следующая
Использование Confluent Schema Registry с Managed Service for Apache Kafka®
Проект Яндекса
© 2025 ООО «Яндекс.Облако»