Управление топиками Apache Kafka®
Кластер Managed Service for Apache Kafka® позволяет управлять топиками и разделами двумя способами (как отдельно, так и совместно):
-
С помощью стандартных интерфейсов Yandex Cloud (CLI, API, консоль управления). Способ подходит, если вы хотите управлять топиками, используя возможности сервиса Managed Service for Apache Kafka®.
Вы можете выполнить следующие действия над топиками Managed Service for Apache Kafka®:
-
С помощью Admin API Apache Kafka®. Способ подходит, если вы хотите использовать уже существующее у вас решение для управления топиками и разделами.
Управление топиками через интерфейсы Yandex Cloud
Создать топик
Перед созданием топика рассчитайте минимальный размер хранилища.
- В консоли управления
перейдите в нужный каталог. - В списке сервисов выберите Managed Service for Kafka.
- Нажмите на имя нужного кластера и перейдите на вкладку Топики.
- Нажмите кнопку Создать топик.
- В блоке Базовые параметры задайте базовые параметры топика:
- Имя топика (должно быть уникально в пределах кластера Apache Kafka®).
- Количество разделов в топике.
- Фактор репликации. Значение этого параметра не должно превышать количество брокеров в кластере. Минимальное значение:
1
. Максимальное значение:3
. Значение по умолчанию:- для кластера из одного или двух брокеров:
1
; - для кластера с тремя и более брокерами:
3
.
- для кластера из одного или двух брокеров:
- В блоке Настройки топика задайте настройки топика.
- Нажмите кнопку Создать.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы создать топик:
-
Посмотрите описание команды CLI для создания топиков:
yc managed-kafka topic create --help
-
Создайте топик:
yc managed-kafka topic create <имя_топика> \ --cluster-name <имя_кластера> \ --partitions <количество_разделов> \ --replication-factor <фактор_репликации>
При необходимости здесь же задайте настройки топика.
-
Откройте актуальный конфигурационный файл Terraform с планом инфраструктуры.
О том, как создать такой файл, см. в разделе Создание кластера.
-
Добавьте ресурс
yandex_mdb_kafka_topic
и, при необходимости, задайте настройки топика в блокеtopic_config
:resource "yandex_mdb_kafka_topic" "<имя_топика>" { cluster_id = "<идентификатор_кластера>" name = "<имя_топика>" partitions = <количество_разделов> replication_factor = <фактор_репликации> topic_config { compression_type = "<тип_сжатия>" flush_messages = <максимальное_число_сообщений_в_памяти> ... } }
-
Проверьте корректность настроек.
-
В командной строке перейдите в каталог, в котором расположены актуальные конфигурационные файлы Terraform с планом инфраструктуры.
-
Выполните команду:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
-
Подробнее см. в документации провайдера Terraform
-
Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:
export IAM_TOKEN="<IAM-токен>"
-
Воспользуйтесь методом Topic.create и выполните запрос, например, с помощью cURL
:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --header "Content-Type: application/json" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/topics' \ --data '{ "topicSpec": { "name": "<имя_топика>", "partitions": "<количество_партиций>", "replicationFactor": "<фактор_репликации>" }'
Где:
-
topicSpec
— настройки топика:name
— имя топика.partitions
– количество разделов.replicationFactor
– фактор репликации.
Идентификатор кластера можно запросить со списком кластеров в каталоге.
-
-
Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.
-
Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:
export IAM_TOKEN="<IAM-токен>"
-
Клонируйте репозиторий cloudapi
:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Далее предполагается, что содержимое репозитория находится в директории
~/cloudapi/
. -
Воспользуйтесь вызовом TopicService/Create и выполните запрос, например, с помощью gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<идентификатор_кластера>", "topic_spec": { "name": "<имя_топика>", "partitions": { "value": "<количество_партиций>" }, "replication_factor": { "value": "<фактор_репликации>" } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.Create
Где:
-
topic_spec
— настройки топика:name
— имя топика.partitions
– количество разделов. Передается в виде объекта с полемvalue
.replication_factor
– фактор репликации. Передается в виде объекта с полемvalue
.
Идентификатор кластера можно запросить со списком кластеров в каталоге.
-
-
Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.
Примечание
В процессе своей работы Managed Service for Apache Kafka® может создавать служебные топики. Записывать пользовательские данные в такие топики нельзя.
Изменить настройки топика
Количество разделов в топиках Managed Service for Apache Kafka® нельзя уменьшить. Если в хранилище не хватает места, создать новые разделы нельзя.
Подробнее см. в разделе Минимальный размер хранилища.
- В консоли управления
перейдите в нужный каталог. - В списке сервисов выберите Managed Service for Kafka.
- Нажмите на имя нужного кластера, затем выберите вкладку Топики.
- Нажмите значок
для нужного топика и выберите пункт Редактировать. - Измените базовые параметры топика:
- Количество разделов в топике.
- Фактор репликации. Значение этого параметра не должно превышать количество брокеров в кластере. Минимальное значение:
1
. Максимальное значение:3
. Значение по умолчанию:- Для кластера из одного или двух брокеров:
1
. - Для кластера с тремя и более брокерами:
3
.
- Для кластера из одного или двух брокеров:
- Измените дополнительные настройки топика.
- Нажмите кнопку Сохранить.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы изменить настройки топика:
-
Посмотрите описание команды CLI для изменения топиков:
yc managed-kafka topic update --help
-
Измените настройки топика:
yc managed-kafka topic update <имя_топика> \ --cluster-name <имя_кластера> \ --partitions <количество_разделов> \ --replication-factor <фактор_репликации>
-
Откройте актуальный конфигурационный файл Terraform с планом инфраструктуры.
О том, как создать такой файл, см. в разделе Создание кластера.
-
Измените значение параметров в описании ресурса
yandex_mdb_kafka_topic
:resource "yandex_mdb_kafka_topic" "<имя_топика>" { cluster_id = "<идентификатор_кластера>" name = "<имя_топика>" partitions = <количество_разделов> replication_factor = <фактор_репликации> topic_config { compression_type = "<тип_сжатия>" flush_messages = <максимальное_число_сообщений_в_памяти> ... } }
-
Проверьте корректность настроек.
-
В командной строке перейдите в каталог, в котором расположены актуальные конфигурационные файлы Terraform с планом инфраструктуры.
-
Выполните команду:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
-
Подробнее см. в документации провайдера Terraform
-
Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:
export IAM_TOKEN="<IAM-токен>"
-
Воспользуйтесь методом Topic.update и выполните запрос, например, с помощью cURL
:Важно
Метод API переопределит все параметры изменяемого объекта, которые не были явно переданы в запросе, на значения по умолчанию. Чтобы избежать этого, перечислите настройки, которые вы хотите изменить, в параметре
updateMask
(одной строкой через запятую).curl \ --request PATCH \ --header "Authorization: Bearer $IAM_TOKEN" \ --header "Content-Type: application/json" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/topics/<имя_топика>' \ --data '{ "clusterId": "<идентификатор_кластера>", "updateMask": "topicSpec.partitions,topicSpec.replicationFactor,topicSpec.topicConfig_2_8.<настройка_1>,...,topicSpec.topicConfig_2_8.<настройка_N>,topicSpec.topicConfig_3.<настройка_1>,...,topicSpec.topicConfig_3.<настройка_N>", "topicSpec": { "partitions": "<количество_партиций>", "replicationFactor": "<фактор_репликации>", "topicConfig_2_8": { "<настройка_1_топика_Apache Kafka®_версии_2.8>": "<значение_1>", "<настройка_2_топика_Apache Kafka®_версии_2.8>": "<значение_2>", ... "<настройка_N_топика_Apache Kafka®_версии_2.8>": "<значение_N>" }, "topicConfig_3": { "<настройка_1_топика_Apache Kafka®_версии_3.x>": "<значение_1>", "<настройка_2_топика_Apache Kafka®_версии_3.x>": "<значение_2>", ... "<настройка_N_топика_Apache Kafka®_версии_3.x>": "<значение_N>" } } }'
Где:
-
updateMask
— перечень изменяемых параметров в одну строку через запятую.В данном случае перечислите все изменяемые настройки топика.
-
topicSpec
— новые настройки топика:-
partitions
– количество разделов. -
replicationFactor
– фактор репликации. -
topicConfig_2_8
— набор настроек топика, если используете Apache Kafka® версии2.8
. Укажите каждую настройку на отдельной строке через запятую. -
topicConfig_3
— набор настроек топика, если используете Apache Kafka® версий3.x
. Укажите каждую настройку на отдельной строке через запятую.Описание и возможные значения настроек см. в разделе Настройки отдельных топиков.
-
Идентификатор кластера можно запросить со списком кластеров в каталоге.
-
-
Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.
-
Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:
export IAM_TOKEN="<IAM-токен>"
-
Клонируйте репозиторий cloudapi
:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Далее предполагается, что содержимое репозитория находится в директории
~/cloudapi/
. -
Воспользуйтесь вызовом TopicService/Update и выполните запрос, например, с помощью gRPCurl
:Важно
Метод API переопределит все параметры изменяемого объекта, которые не были явно переданы в запросе, на значения по умолчанию. Чтобы избежать этого, перечислите настройки, которые вы хотите изменить, в параметре
update_mask
(в виде массива строкpaths[]
).Формат перечисления настроек
"update_mask": { "paths": [ "<настройка_1>", "<настройка_2>", ... "<настройка_N>" ] }
grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<идентификатор_кластера>", "topic_name": "<имя_топика>", "update_mask": { "paths": [ "topic_spec.partitions", "topic_spec.replication_factor", "topic_spec.topic_config_2_8.<настройка_1>", ..., "topic_spec.topic_config_2_8.<настройка_N>", "topic_spec.topic_config_3.<настройка_1>", ..., "topic_spec.topic_config_3.<настройка_N>" ] }, "topic_spec": { "partitions": { "value": "<количество_партиций>" }, "replication_factor": { "value": "<фактор_репликации>" }, "topic_сonfig_2_8": { "<настройка_1_топика_Apache Kafka®_версии_2.8>": "<значение_1>", "<настройка_2_топика_Apache Kafka®_версии_2.8>": "<значение_2>", ... "<настройка_N_топика_Apache Kafka®_версии_2.8>": "<значение_N>" }, "topic_сonfig_3": { "<настройка_1_топика_Apache Kafka®_версии_3.x>": "<значение_1>", "<настройка_2_топика_Apache Kafka®_версии_3.x>": "<значение_2>", ... "<настройка_N_топика_Apache Kafka®_версии_3.x>": "<значение_N>" } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.Update
Где:
-
update_mask
— перечень изменяемых параметров в виде массива строкpaths[]
.В данном случае перечислите все изменяемые настройки топика.
-
topic_spec
— новые настройки топика:-
partitions
– количество разделов. Передается в виде объекта с полемvalue
. -
replication_factor
– фактор репликации. Передается в виде объекта с полемvalue
. -
topic_config_2_8
— набор настроек топика, если используете Apache Kafka® версии2.8
. Укажите каждую настройку на отдельной строке через запятую. -
topic_config_3
— набор настроек топика, если используете Apache Kafka® версий3.x
. Укажите каждую настройку на отдельной строке через запятую.Описание и возможные значения настроек см. в разделе Настройки отдельных топиков.
-
Идентификатор кластера можно запросить со списком кластеров в каталоге.
-
-
Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.
Получить список топиков в кластере
- В консоли управления
перейдите в нужный каталог. - В списке сервисов выберите Managed Service for Kafka.
- Нажмите на имя нужного кластера и перейдите на вкладку Топики.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы получить список топиков, выполните следующую команду:
yc managed-kafka topic list --cluster-name <имя_кластера>
-
Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:
export IAM_TOKEN="<IAM-токен>"
-
Воспользуйтесь методом Topic.list и выполните запрос, например, с помощью cURL
:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/topics'
Идентификатор кластера можно запросить со списком кластеров в каталоге.
-
Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.
-
Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:
export IAM_TOKEN="<IAM-токен>"
-
Клонируйте репозиторий cloudapi
:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Далее предполагается, что содержимое репозитория находится в директории
~/cloudapi/
. -
Воспользуйтесь вызовом TopicService/List и выполните запрос, например, с помощью gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<идентификатор_кластера>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.List
Идентификатор кластера можно запросить со списком кластеров в каталоге.
-
Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.
Получить детальную информацию о топике
- В консоли управления
перейдите в нужный каталог. - В списке сервисов выберите Managed Service for Kafka.
- Нажмите на имя нужного кластера и перейдите на вкладку Топики.
- Нажмите на имя нужного топика.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы получить детальную информацию о топике, выполните следующую команду:
yc managed-kafka topic get <имя_топика> --cluster-name <имя_кластера>
-
Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:
export IAM_TOKEN="<IAM-токен>"
-
Воспользуйтесь методом Topic.list и выполните запрос, например, с помощью cURL
:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/topics/<имя_топика>'
Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя топика — со списком топиков в кластере.
-
Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.
-
Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:
export IAM_TOKEN="<IAM-токен>"
-
Клонируйте репозиторий cloudapi
:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Далее предполагается, что содержимое репозитория находится в директории
~/cloudapi/
. -
Воспользуйтесь вызовом TopicService/Get и выполните запрос, например, с помощью gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<идентификатор_кластера>", "topic_name": "<имя_топика>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.Get
Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя топика — со списком топиков в кластере.
-
Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.
Импортировать топик в Terraform
С помощью импорта вы можете передать существующие в кластере топики под управление Terraform.
-
Укажите в конфигурационном файле Terraform топик, который необходимо импортировать:
resource "yandex_mdb_kafka_topic" "<имя_топика>" {}
-
Выполните команду для импорта топика:
terraform import yandex_mdb_kafka_topic.<имя_топика> <идентификатор_кластера>:<имя_топика>
Подробнее об импорте топиков см. в документации провайдера Terraform
.
Удалить топик
Примечание
Выданные пользователю права на доступ к топику сохраняются после его удаления . Если после удаления вы не отзывали права, то при создании топика с тем же именем пользователь сможет работать с ним без переназначения прав.
- В консоли управления
перейдите в нужный каталог. - В списке сервисов выберите Managed Service for Kafka.
- Нажмите на имя нужного кластера и перейдите на вкладку Топики.
- Нажмите значок
для нужного топика и выберите пункт Удалить топик. - В открывшемся окне нажмите кнопку Удалить.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы удалить топик:
-
Посмотрите описание команды CLI для изменения топиков:
yc managed-kafka topic delete --help
-
Удалите топик:
yc managed-kafka topic delete <имя_топика> --cluster-name <имя_кластера>
-
Откройте актуальный конфигурационный файл Terraform с планом инфраструктуры.
О том, как создать такой файл, см. в разделе Создание кластера.
-
Удалите ресурс
yandex_mdb_kafka_topic
с описанием нужного топика. -
Проверьте корректность настроек.
-
В командной строке перейдите в каталог, в котором расположены актуальные конфигурационные файлы Terraform с планом инфраструктуры.
-
Выполните команду:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
-
Подробнее см. в документации провайдера Terraform
-
Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:
export IAM_TOKEN="<IAM-токен>"
-
Воспользуйтесь методом Topic.delete и выполните запрос, например, с помощью cURL
:curl \ --request DELETE \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/topics/<имя_топика>'
Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя топика — со списком топиков в кластере.
-
Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.
-
Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:
export IAM_TOKEN="<IAM-токен>"
-
Клонируйте репозиторий cloudapi
:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Далее предполагается, что содержимое репозитория находится в директории
~/cloudapi/
. -
Воспользуйтесь вызовом TopicService/Delete и выполните запрос, например, с помощью gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<идентификатор_кластера>", "topic_name": "<имя_топика>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.Delete
Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя топика — со списком топиков в кластере.
-
Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.
Управление топиками через Admin API Apache Kafka®
Чтобы управлять топиками через Admin API Apache Kafka®:
- Создайте в кластере пользователя-администратора с ролью
ACCESS_ROLE_ADMIN
. - Управляйте топиками от имени этого пользователя с помощью запросов к Admin API Apache Kafka®. О работе с Admin API читайте в документации выбранного языка программирования.
Подробнее о работе с Admin API и о действующих ограничениях читайте в разделе Управление топиками и разделами и в документации Apache Kafka®