Managing Apache Kafka® topics
A Managed Service for Apache Kafka® cluster provides two ways for you to manage topics and partitions (which can be used separately or combined):
-
Using native Yandex Cloud interfaces, such as the CLI, API, or management console. Choose this method if you want to manage topics using Managed Service for Apache Kafka® features.
You can perform the following actions on Managed Service for Apache Kafka® topics:
-
Using the Apache Kafka® Admin API. Select this method if you prefer to use your existing solution to manage topics and partitions.
Managing topics via Yandex Cloud interfaces
Creating a topic
Prior to creating a topic, calculate the minimum storage size.
- In the management console
, go to the relevant folder. - In the services list, select Managed Service for Kafka.
- Click the cluster name and go to the Topics tab.
- Click Create topic.
- Under Basic parameters, set the basic parameters of the topic:
- Topic name (must be unique in the Apache Kafka® cluster).
- Number of topic partitions.
- Replication factor. This parameter value should not exceed the number of brokers in the cluster. Minimum value:
1
. Maximum value:3
. Default value:- For a cluster with one or two brokers:
1
. - For a cluster with three or more brokers:
3
.
- For a cluster with one or two brokers:
- Under Topic settings, specify the topic settings.
- Click Create.
If you do not have the Yandex Cloud command line interface yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder using the --folder-name
or --folder-id
parameter.
To create a topic:
-
View a description of the CLI create topic command:
yc managed-kafka topic create --help
-
Create a topic:
yc managed-kafka topic create <topic_name> \ --cluster-name <cluster_name> \ --partitions <number_of_partitions> \ --replication-factor <replication_factor>
If necessary, specify the topic settings here.
-
Open the current Terraform configuration file with an infrastructure plan.
For more information about creating this file, see Creating clusters.
-
Add the
yandex_mdb_kafka_topic
resource and configure the topic undertopic_config
if required:resource "yandex_mdb_kafka_topic" "<topic_name>" { cluster_id = "<cluster_ID>" name = "<topic_name>" partitions = <number_of_partitions> replication_factor = <replication_factor> topic_config { compression_type = "<compression_type>" flush_messages = <maximum_number_of_messages_in_memory> ... } }
-
Make sure the settings are correct.
-
Using the command line, navigate to the folder that contains the up-to-date Terraform configuration files with an infrastructure plan.
-
Run the command:
terraform validate
If there are errors in the configuration files, Terraform will point to them.
-
-
Confirm updating the resources.
-
Run the command to view planned changes:
terraform plan
If the resource configuration descriptions are correct, the terminal will display a list of the resources to modify and their parameters. This is a test step. No resources are updated.
-
If you are happy with the planned changes, apply them:
-
Run the command:
terraform apply
-
Confirm the update of resources.
-
Wait for the operation to complete.
-
-
For more information, see the Terraform
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Topic.create method and send the following request, e.g., via cURL
:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --header "Content-Type: application/json" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics' \ --data '{ "topicSpec": { "name": "<topic_name>", "partitions": "<number_of_partitions>", "replicationFactor": "<replication_factor>" }'
Where:
-
topicSpec
stands for topic settings:name
: Topic name.partitions
: Number of partitions.replicationFactor
: Replication factor.
You can get the cluster ID with a list of clusters in the folder.
-
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the TopicService/Create call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_spec": { "name": "<topic_name>", "partitions": { "value": "<number_of_partitions>" }, "replication_factor": { "value": "<replication_factor>" } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.Create
Where:
-
topic_spec
stands for topic settings:name
: Topic name.partitions
: Number of partitions. Provided as an object with thevalue
field.replication_factor
: Replication factor. Provided as an object with thevalue
field.
You can get the cluster ID with a list of clusters in the folder.
-
-
View the server response to make sure the request was successful.
Note
While running, Managed Service for Apache Kafka® is able to create service topics. You cannot write user data to such topics.
Updating topic settings
You cannot reduce the number of partitions in Managed Service for Apache Kafka® topics. You cannot create new partitions if there is not enough storage space.
For more information, see Minimum storage size.
- In the management console
, go to the relevant folder. - In the services list, select Managed Service for Kafka.
- Click the name of the cluster you need and select the Topics tab.
- Click
for the topic you need and select Edit. - Change the basic parameters of the topic:
- Number of topic partitions.
- Replication factor. This parameter value should not exceed the number of brokers in the cluster. Minimum value:
1
. Maximum value:3
. Default:- For a cluster with one or two brokers:
1
. - For a cluster with three or more brokers:
3
.
- For a cluster with one or two brokers:
- Change additional topic settings.
- Click Save.
If you do not have the Yandex Cloud command line interface yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder using the --folder-name
or --folder-id
parameter.
To update topic settings:
-
View a description of the CLI update topic command:
yc managed-kafka topic update --help
-
Change topic settings:
yc managed-kafka topic update <topic_name> \ --cluster-name <cluster_name> \ --partitions <number_of_partitions> \ --replication-factor <replication_factor>
-
Open the current Terraform configuration file with an infrastructure plan.
For more information about creating this file, see Creating clusters.
-
Edit the parameter values in the
yandex_mdb_kafka_topic
resource description:resource "yandex_mdb_kafka_topic" "<topic_name>" { cluster_id = "<cluster_ID>" name = "<topic_name>" partitions = <number_of_partitions> replication_factor = <replication_factor> topic_config { compression_type = "<compression_type>" flush_messages = <maximum_number_of_messages_in_memory> ... } }
-
Make sure the settings are correct.
-
Using the command line, navigate to the folder that contains the up-to-date Terraform configuration files with an infrastructure plan.
-
Run the command:
terraform validate
If there are errors in the configuration files, Terraform will point to them.
-
-
Confirm updating the resources.
-
Run the command to view planned changes:
terraform plan
If the resource configuration descriptions are correct, the terminal will display a list of the resources to modify and their parameters. This is a test step. No resources are updated.
-
If you are happy with the planned changes, apply them:
-
Run the command:
terraform apply
-
Confirm the update of resources.
-
Wait for the operation to complete.
-
-
For more information, see the Terraform provider documentation
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Topic.update method and send the following request, e.g., via cURL
:Warning
The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the
updateMask
parameter as a single comma-separated string.curl \ --request PATCH \ --header "Authorization: Bearer $IAM_TOKEN" \ --header "Content-Type: application/json" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics/<topic_name>' \ --data '{ "clusterId": "<cluster_ID>", "updateMask": "topicSpec.partitions,topicSpec.replicationFactor,topicSpec.topicConfig_2_8.<setting_1>,...,topicSpec.topicConfig_2_8.<setting_N>,topicSpec.topicConfig_3.<setting_1>,...,topicSpec.topicConfig_3.<setting_N>", "topicSpec": { "partitions": "<number_of_partitions>", "replicationFactor": "<replication_factor>", "topicConfig_2_8": { "<setting_1_for_Apache Kafka®_2.8_topic>": "<value_1>", "<setting_2_for_Apache Kafka®_2.8_topic>": "<value_2>", ... "<setting_N_for_Apache Kafka®_2.8_topic>": "<value_N>" }, "topicConfig_3": { "<setting_1_for_Apache Kafka®_3.x_topic>": "<value_1>", "<setting_2_for_Apache Kafka®_3.x_topic>": "<value_2>", ... "<setting_N_for_Apache Kafka®_3.x_topic>": "<value_N>" } } }'
Where:
-
updateMask
: List of parameters to update as a single string, separated by commas.In this case, list all the topic settings to update.
-
topic_spec
stands for new topic settings:-
partitions
: Number of partitions. -
replicationFactor
: Replication factor. -
topicConfig_2_8
: Preset topic settings for Apache Kafka®2.8
. Use a separate line for each setting; separate them by commas. -
topicConfig_3
: Preset topic settings for Apache Kafka®3.x
. Use a separate line for each setting; separate them by commas.See Settings for individual topics for a description and possible values for each setting.
-
You can get the cluster ID with a list of clusters in the folder.
-
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the TopicService/Update call and send the following request, e.g., via gRPCurl
:Warning
The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the
update_mask
parameter as an array ofpaths[]
strings.Format for listing settings
"update_mask": { "paths": [ "<setting_1>", "<setting_2>", ... "<setting_N>" ] }
grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_name": "<topic_name>", "update_mask": { "paths": [ "topic_spec.partitions", "topic_spec.replication_factor", "topic_spec.topic_config_2_8.<setting_1>", ..., "topic_spec.topic_config_2_8.<setting_N>", "topic_spec.topic_config_3.<setting_1>", ..., "topic_spec.topic_config_3.<setting_N>" ] }, "topic_spec": { "partitions": { "value": "<number_of_partitions>" }, "replication_factor": { "value": "<replication_factor>" }, "topic_сonfig_2_8": { "<setting_1_for_Apache Kafka®_2.8_topic>": "<value_1>", "<setting_2_for_Apache Kafka®_2.8_topic>": "<value_2>", ... "<setting_N_for_Apache Kafka®_2.8_topic>": "<value_N>" }, "topic_сonfig_3": { "<setting_1_for_Apache Kafka®_3.x_topic>": "<value_1>", "<setting_2_for_Apache Kafka®_3.x_topic>": "<value_2>", ... "<setting_N_for_Apache Kafka®_3.x_topic>": "<value_N>" } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.Update
Where:
-
update_mask
: List of parameters to update as an array ofpaths[]
strings.In this case, list all the topic settings to update.
-
topic_spec
stands for new topic settings:-
partitions
: Number of partitions. Provided as an object with thevalue
field. -
replication_factor
: Replication factor. Provided as an object with thevalue
field. -
topic_config_2_8
: Topic settings for Apache Kafka®2.8
. Use a separate line for each setting; separate them by commas. -
topic_config_3
: Topic settings for Apache Kafka®3.x
. Use a separate line for each setting; separate them by commas.See Settings for individual topics for a description and possible values for each setting.
-
You can get the cluster ID with a list of clusters in the folder.
-
-
View the server response to make sure the request was successful.
Getting a list of topics in a cluster
- In the management console
, go to the relevant folder. - In the services list, select Managed Service for Kafka.
- Click the cluster name and go to the Topics tab.
If you do not have the Yandex Cloud command line interface yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder using the --folder-name
or --folder-id
parameter.
To get a list of topics, run the following command:
yc managed-kafka topic list --cluster-name <cluster_name>
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Topic.list method and send the following request, e.g., via cURL
:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics'
You can get the cluster ID with a list of clusters in the folder.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the TopicService/List call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.List
You can get the cluster ID with a list of clusters in the folder.
-
View the server response to make sure the request was successful.
Getting detailed information about a topic
- In the management console
, go to the relevant folder. - In the services list, select Managed Service for Kafka.
- Click the cluster name and go to the Topics tab.
- Click the topic name.
If you do not have the Yandex Cloud command line interface yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder using the --folder-name
or --folder-id
parameter.
To get detailed information about a topic, run the following command:
yc managed-kafka topic get <topic_name> --cluster-name <cluster_name>
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Topic.list method and send the following request, e.g., via cURL
:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics/<topic_name>'
You can request the cluster ID with a list of clusters in the folder and the topic name with a list of topics in the cluster.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the TopicService/Get call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_name": "<topic_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.Get
You can request the cluster ID with a list of clusters in the folder and the topic name with a list of topics in the cluster.
-
View the server response to make sure the request was successful.
Importing topics to Terraform
Using import, you can bring the existing cluster topics under Terraform management.
-
In the Terraform configuration file, specify the topic you want to import:
resource "yandex_mdb_kafka_topic" "<topic_name>" {}
-
Run the following command to import the topic:
terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name>
To learn more about importing topics, see the Terraform provider documentation
.
Deleting a topic
Note
Permissions granted to the user for a topic remain even after the topic is deleted. If, after deleting a topic, you don't revoke the permissions, then, when you create a topic with the same name, the user will have access to it even if you don't explicitly assign them new permissions.
- In the management console
, go to the relevant folder. - In the services list, select Managed Service for Kafka.
- Click the cluster name and go to the Topics tab.
- Click
for the topic and select Delete topic. - In the window that opens, click Delete.
If you do not have the Yandex Cloud command line interface yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder using the --folder-name
or --folder-id
parameter.
To delete a topic:
-
View a description of the CLI update topic command:
yc managed-kafka topic delete --help
-
Delete a topic:
yc managed-kafka topic delete <topic_name> --cluster-name <cluster_name>
-
Open the current Terraform configuration file with an infrastructure plan.
For more information about creating this file, see Creating clusters.
-
Delete the
yandex_mdb_kafka_topic
resource with the topic description. -
Make sure the settings are correct.
-
Using the command line, navigate to the folder that contains the up-to-date Terraform configuration files with an infrastructure plan.
-
Run the command:
terraform validate
If there are errors in the configuration files, Terraform will point to them.
-
-
Confirm updating the resources.
-
Run the command to view planned changes:
terraform plan
If the resource configuration descriptions are correct, the terminal will display a list of the resources to modify and their parameters. This is a test step. No resources are updated.
-
If you are happy with the planned changes, apply them:
-
Run the command:
terraform apply
-
Confirm the update of resources.
-
Wait for the operation to complete.
-
-
For more information, see the Terraform
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Topic.delete method and send the following request, e.g., via cURL
:curl \ --request DELETE \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics/<topic_name>'
You can request the cluster ID with a list of clusters in the folder and the topic name with a list of topics in the cluster.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the TopicService/Delete call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_name": "<topic_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.Delete
You can request the cluster ID with a list of clusters in the folder and the topic name with a list of topics in the cluster.
-
View the server response to make sure the request was successful.
Managing topics via the Apache Kafka® Admin API
To manage topics via the Apache Kafka® Admin API:
- Create an admin user with the
ACCESS_ROLE_ADMIN
role in the cluster. - Manage topics on behalf of this user by making requests to the Apache Kafka® Admin API. Review your favorite programming language manual for information on working with the Admin API.
For more information about working with the Admin API and the existing limitations, see Managing topics and partitions and the Apache Kafka® documentation