Managing Apache Kafka® topics
A Managed Service for Apache Kafka® cluster allows you to manage topics and partitions in two ways, either separately or together:
-
Using the native Yandex Cloud interfaces, such as the CLI, API, or management console. Choose this option if you want to manage topics using Managed Service for Apache Kafka® features.
You can do the following with Managed Service for Apache Kafka® topics:
-
Using the Apache Kafka® Admin API. Select this option to use your own solutions to manage topics and partitions.
Managing topics via Yandex Cloud interfaces
Creating a topic
Prior to creating a topic, calculate the minimum storage size.
- In the management console
, navigate to the relevant folder. - Navigate to the Managed Service for Kafka service.
- Click the cluster name and go to the Topics tab.
- Click Create topic.
- Under Basic parameters, set the basic topic properties:
-
Topic name, which must be unique within the Apache Kafka® cluster.
Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_. You cannot create such a topic using the Yandex Cloud interfaces. -
Number of topic partitions.
-
Replication factor. This value should not exceed the number of brokers in the cluster. The minimum value is
1. The maximum value is3. The default value is as follows:- For a cluster with one or two brokers:
1. - For a cluster with three or more brokers:
3.
- For a cluster with one or two brokers:
-
- Under Topic settings, specify the topic settings.
- Click Create.
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also set a different folder for any specific command using the --folder-name or --folder-id parameter.
To create a topic:
-
See the description of the CLI command for creating topics:
yc managed-kafka topic create --help -
Create a topic:
yc managed-kafka topic create <topic_name> \ --cluster-name <cluster_name> \ --partitions <number_of_partitions> \ --replication-factor <replication_factor>Specify the topic settings here, if required.
Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_. You cannot create such a topic using the Yandex Cloud interfaces.
-
Open the current Terraform configuration file describing your infrastructure.
Learn how to create this file in Creating a cluster.
-
Add the
yandex_mdb_kafka_topicresource and specify the topic settings in thetopic_config, if required:resource "yandex_mdb_kafka_topic" "<topic_name>" { cluster_id = "<cluster_ID>" name = "<topic_name>" partitions = <number_of_partitions> replication_factor = <replication_factor> topic_config { compression_type = "<compression_type>" flush_messages = <maximum_number_of_messages_in_memory> ... } }Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_. You cannot create such a topic using the Yandex Cloud interfaces. -
Make sure the settings are correct.
-
In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
-
Run this command:
terraform validateTerraform will show any errors found in your configuration files.
-
-
Confirm updating the resources.
-
Run this command to view the planned changes:
terraform planIf you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply -
Confirm updating the resources.
-
Wait for the operation to complete.
-
-
For more information, see this Terraform provider guide.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Call the Topic.create method, e.g., via the following cURL
request:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --header "Content-Type: application/json" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics' \ --data '{ "topicSpec": { "name": "<topic_name>", "partitions": "<number_of_partitions>", "replicationFactor": "<replication_factor>" }'Where:
-
topicSpecstands for the topic settings:-
name: Topic name.Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_. You cannot create such a topic using the Yandex Cloud interfaces. -
partitions: Number of partitions. -
replicationFactor: Replication factor.
-
You can get the cluster ID with the list of clusters in the folder.
-
-
Check the server response to make sure your request was successful.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume that the repository contents reside in the
~/cloudapi/directory. -
Call the TopicService/Create method, e.g., via the following gRPCurl
request:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_spec": { "name": "<topic_name>", "partitions": { "value": "<number_of_partitions>" }, "replication_factor": { "value": "<replication_factor>" } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.CreateWhere:
-
topic_specstands for the topic settings:-
name: Topic name.Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_. You cannot create such a topic using the Yandex Cloud interfaces. -
partitions: Number of partitions, provided as an object with thevaluefield. -
replication_factor: Replication factor, provided as an object with thevaluefield.
-
You can get the cluster ID with the list of clusters in the folder.
-
-
Check the server response to make sure your request was successful.
Note
While running, Managed Service for Apache Kafka® can create service topics. You cannot write user data to such topics.
Updating topic settings
You cannot reduce the number of partitions in Managed Service for Apache Kafka® topics. You cannot create new partitions if there is not enough storage space.
For more information, see Minimum storage size.
- In the management console
, navigate to the relevant folder. - Navigate to the Managed Service for Kafka service.
- Click the name of your cluster and select the Topics tab.
- Click
for the topic in question and select Edit. - Change the basic topic settings:
- Number of topic partitions.
- Replication factor. This value should not exceed the number of brokers in the cluster. The minimum value is
1. The maximum value is3. The default value is as follows:- For a cluster with one or two brokers:
1. - For a cluster with three or more brokers:
3.
- For a cluster with one or two brokers:
- Change additional topic settings.
- Click Save.
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also set a different folder for any specific command using the --folder-name or --folder-id parameter.
To update topic settings:
-
See the description of the CLI command for updating topics:
yc managed-kafka topic update --help -
Change topic settings:
yc managed-kafka topic update <topic_name> \ --cluster-name <cluster_name> \ --partitions <number_of_partitions> \ --replication-factor <replication_factor>
-
Open the current Terraform configuration file describing your infrastructure.
Learn how to create this file in Creating a cluster.
-
Edit the parameter values in the
yandex_mdb_kafka_topicresource description:resource "yandex_mdb_kafka_topic" "<topic_name>" { cluster_id = "<cluster_ID>" name = "<topic_name>" partitions = <number_of_partitions> replication_factor = <replication_factor> topic_config { compression_type = "<compression_type>" flush_messages = <maximum_number_of_messages_in_memory> ... } } -
Make sure the settings are correct.
-
In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
-
Run this command:
terraform validateTerraform will show any errors found in your configuration files.
-
-
Confirm updating the resources.
-
Run this command to view the planned changes:
terraform planIf you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply -
Confirm updating the resources.
-
Wait for the operation to complete.
-
-
For more information, see this Terraform provider guide.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Call the Topic.update method, e.g., via the following cURL
request:Warning
The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the
updateMaskparameter as a single comma-separated string.curl \ --request PATCH \ --header "Authorization: Bearer $IAM_TOKEN" \ --header "Content-Type: application/json" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics/<topic_name>' \ --data '{ "clusterId": "<cluster_ID>", "updateMask": "topicSpec.partitions,topicSpec.replicationFactor,topicSpec.topicConfig_2_8.<setting_1>,...,topicSpec.topicConfig_2_8.<setting_N>,topicSpec.topicConfig_3.<setting_1>,...,topicSpec.topicConfig_3.<setting_N>", "topicSpec": { "partitions": "<number_of_partitions>", "replicationFactor": "<replication_factor>", "topicConfig_3": { "<setting_1_for_Apache Kafka®_3.x_topic>": "<value_1>", "<setting_2_for_Apache Kafka®_3.x_topic>": "<value_2>", ... "<setting_N_for_Apache Kafka®_3.x_topic>": "<value_N>" } } }'Where:
-
updateMask: Comma-separated string of settings you want to update.In this case, list all topic settings to update.
-
topicSpecstands for the new topic settings:-
partitions: Number of partitions. -
replicationFactor: Replication factor. -
topicConfig_3: Topic settings for Apache Kafka®3.x. Specify each setting on a new line, separated by commas.See Settings for individual topics for the descriptions and possible values of the settings.
-
You can get the cluster ID with the list of clusters in the folder.
-
-
Check the server response to make sure your request was successful.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume that the repository contents reside in the
~/cloudapi/directory. -
Call the TopicService/Update method, e.g., via the following gRPCurl
request:Warning
The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the
update_maskparameter as an array ofpaths[]strings.Format for listing settings
"update_mask": { "paths": [ "<setting_1>", "<setting_2>", ... "<setting_N>" ] }grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_name": "<topic_name>", "update_mask": { "paths": [ "topic_spec.partitions", "topic_spec.replication_factor", "topic_spec.topic_config_2_8.<setting_1>", ..., "topic_spec.topic_config_2_8.<setting_N>", "topic_spec.topic_config_3.<setting_1>", ..., "topic_spec.topic_config_3.<setting_N>" ] }, "topic_spec": { "partitions": { "value": "<number_of_partitions>" }, "replication_factor": { "value": "<replication_factor>" }, "topic_config_3": { "<setting_1_for_Apache Kafka®_3.x_topic>": "<value_1>", "<setting_2_for_Apache Kafka®_3.x_topic>": "<value_2>", ... "<setting_N_for_Apache Kafka®_3.x_topic>": "<value_N>" } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.UpdateWhere:
-
update_mask: List of settings you want to update as an array of strings (paths[]).In this case, list all topic settings to update.
-
topic_specstands for the new topic settings:-
partitions: Number of partitions, provided as an object with thevaluefield. -
replication_factor: Replication factor, provided as an object with thevaluefield. -
topic_config_3: Topic settings for Apache Kafka®3.x. Specify each setting on a new line, separated by commas.See Settings for individual topics for the descriptions and possible values of the settings.
-
You can get the cluster ID with the list of clusters in the folder.
-
-
Check the server response to make sure your request was successful.
Getting a list of topics in a cluster
- In the management console
, navigate to the relevant folder. - Navigate to the Managed Service for Kafka service.
- Click the cluster name and go to the Topics tab.
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also set a different folder for any specific command using the --folder-name or --folder-id parameter.
To get a list of topics, run the following command:
yc managed-kafka topic list --cluster-name <cluster_name>
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Call the Topic.list method, e.g., via the following cURL
request:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics'You can get the cluster ID with the list of clusters in the folder.
-
Check the server response to make sure your request was successful.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume that the repository contents reside in the
~/cloudapi/directory. -
Call the TopicService/List method, e.g., via the following gRPCurl
request:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.ListYou can get the cluster ID with the list of clusters in the folder.
-
Check the server response to make sure your request was successful.
Getting detailed information about a topic
- In the management console
, navigate to the relevant folder. - Navigate to the Managed Service for Kafka service.
- Click the cluster name and go to the Topics tab.
- Click the topic name.
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also set a different folder for any specific command using the --folder-name or --folder-id parameter.
To get detailed information about a topic, run the following command:
yc managed-kafka topic get <topic_name> --cluster-name <cluster_name>
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Call the Topic.list method, e.g., via the following cURL
request:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics/<topic_name>'You can get the cluster ID with the list of clusters in the folder, and the topic name, with the list of topics in the cluster.
-
Check the server response to make sure your request was successful.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume that the repository contents reside in the
~/cloudapi/directory. -
Call the TopicService/Get method, e.g., via the following gRPCurl
request:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_name": "<topic_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.GetYou can get the cluster ID with the list of clusters in the folder, and the topic name, with the list of topics in the cluster.
-
Check the server response to make sure your request was successful.
Importing a topic to Terraform
You can import the existing cluster topics to manage them with Terraform.
-
In the Terraform configuration file, specify the topic you want to import:
resource "yandex_mdb_kafka_topic" "<topic_name>" {} -
Run the following command to import your topic:
terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name>To learn more about importing topics, see this Terraform provider guide.
Transferring information about created topics to the Terraform state file
When upgrading to a new Terraform provider version, the state file and configuration file may differ regarding created topics, with obsolete topic attributes and new yandex_mdb_kafka_topic resources. To eliminate the differences, delete the topic attributes and transfer information about the created yandex_mdb_kafka_topic resources to the .tfstate state file. You can do this in one of the two methods described below.
First method
-
Delete the cluster information from the
.tfstatefile using this command:terraform state rm yandex_mdb_kafka_cluster.<cluster_name> -
Edit the Terraform configuration file:
- Delete the
topicattributes from theyandex_mdb_kafka_clusterresource. - Add new
yandex_mdb_kafka_topicresources.
Example of the updated configuration file
resource "yandex_mdb_kafka_cluster" "this" { name = "terraform-test" environment = "PRODUCTION" network_id = data.yandex_vpc_network.this.id config { version = "3.4" brokers_count = 1 zones = ["ru-central1-a"] kafka { resources { resource_preset_id = "s2.small" disk_size = 30 disk_type_id = "network-ssd" } kafka_config { log_segment_bytes = 104857600 } } } } resource "yandex_mdb_kafka_topic" "topic1" { cluster_id = yandex_mdb_kafka_cluster.this.id name = "topic1" partitions = 3 replication_factor = 1 } resource "yandex_mdb_kafka_topic" "topic2" { cluster_id = yandex_mdb_kafka_cluster.this.id name = "topic2" partitions = 3 replication_factor = 1 } - Delete the
-
Import the cluster and topics:
terraform import yandex_mdb_kafka_cluster.<cluster_name> <cluster_ID> terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name> terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name> -
Check the result using this command:
terraform planIf the state file matches the configuration, the terminal displays this message:
No changes. Infrastructure is up-to-date.Terraform will inform you if the state file does not match the configuration.
Second method
-
Download the
.tfstatefile using this command:terraform state pull -
Open the downloaded file in any text editor and delete the
topicattributes from theyandex_mdb_kafka_clusterresource. -
Push the updated state file using this command:
terraform state push -
Edit the Terraform configuration file:
- Delete the
topicattributes from theyandex_mdb_kafka_clusterresource. - Add new
yandex_mdb_kafka_topicresources.
Example of the updated configuration file
resource "yandex_mdb_kafka_cluster" "this" { name = "terraform-test" environment = "PRODUCTION" network_id = data.yandex_vpc_network.this.id config { version = "3.4" brokers_count = 1 zones = ["ru-central1-a"] kafka { resources { resource_preset_id = "s2.small" disk_size = 30 disk_type_id = "network-ssd" } kafka_config { log_segment_bytes = 104857600 } } } } resource "yandex_mdb_kafka_topic" "topic1" { cluster_id = yandex_mdb_kafka_cluster.this.id name = "topic1" partitions = 3 replication_factor = 1 } resource "yandex_mdb_kafka_topic" "topic2" { cluster_id = yandex_mdb_kafka_cluster.this.id name = "topic2" partitions = 3 replication_factor = 1 } - Delete the
-
Import the topics:
terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name> terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name> -
Check the result using this command:
terraform planIf the state file matches the configuration, the terminal displays this message:
No changes. Infrastructure is up-to-date.Terraform will inform you if the state file does not match the configuration.
Deleting a topic
Note
Permissions granted to a user for a topic persist even after the topic is deleted. If you do not revoke the permissions after topic deletion, the user will be able to access a newly created topic with the same name without reassigning permissions.
- In the management console
, navigate to the relevant folder. - Navigate to the Managed Service for Kafka service.
- Click the cluster name and go to the Topics tab.
- Click
for the topic in question and select Delete topic. - In the window that opens, click Delete.
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also set a different folder for any specific command using the --folder-name or --folder-id parameter.
To delete a topic:
-
See the description of the CLI command for deleting topics:
yc managed-kafka topic delete --help -
Delete a topic:
yc managed-kafka topic delete <topic_name> --cluster-name <cluster_name>
-
Open the current Terraform configuration file describing your infrastructure.
Learn how to create this file in Creating a cluster.
-
Delete the
yandex_mdb_kafka_topicresource with the topic description. -
Make sure the settings are correct.
-
In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
-
Run this command:
terraform validateTerraform will show any errors found in your configuration files.
-
-
Confirm updating the resources.
-
Run this command to view the planned changes:
terraform planIf you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply -
Confirm updating the resources.
-
Wait for the operation to complete.
-
-
For more information, see this Terraform provider guide.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Use the Topic.delete method and send the following request, e.g., via cURL
:curl \ --request DELETE \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics/<topic_name>'You can get the cluster ID with the list of clusters in the folder, and the topic name, with the list of topics in the cluster.
-
Check the server response to make sure your request was successful.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume that the repository contents reside in the
~/cloudapi/directory. -
Call the TopicService/Delete method, e.g., via the following gRPCurl
request:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_name": "<topic_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.DeleteYou can get the cluster ID with the list of clusters in the folder, and the topic name, with the list of topics in the cluster.
-
Check the server response to make sure your request was successful.
Managing topics via the Apache Kafka® Admin API
To manage topics via the Apache Kafka® Admin API:
- Create an admin user with either the
ACCESS_ROLE_ADMINorACCESS_ROLE_TOPIC_ADMINrole in the cluster. Learn more about the permissions you get with each role here. - Manage topics on behalf of this user by making requests to the Apache Kafka® Admin API. Learn how to use the Admin API in your programming language guide.
For more information about working with the Admin API and the existing limitations, see Managing topics and partitions and this Apache Kafka® guide