Managing Apache Kafka® topics
A Managed Service for Apache Kafka® cluster provides two ways for you to manage topics and partitions (which can be used separately or combined):
-
Using native Yandex Cloud interfaces, such as the CLI, API, or management console. Choose this method if you want to manage topics using Managed Service for Apache Kafka® features.
You can perform the following actions on Managed Service for Apache Kafka® topics:
-
Using the Apache Kafka® Admin API. Select this method if you prefer to use your existing solution to manage topics and partitions.
Managing topics via Yandex Cloud interfaces
Creating a topic
Prior to creating a topic, calculate the minimum storage size.
- In the management console
, go to the relevant folder. - In the list of services, select Managed Service for Kafka.
- Click the cluster name and go to the Topics tab.
- Click Create topic.
- Under Basic parameters, set the basic parameters of the topic:
-
Topic name (must be unique in the Apache Kafka® cluster).
Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_. You cannot create such a topic using the Yandex Cloud interfaces. -
Number of topic partitions.
-
Replication factor. This parameter value should not exceed the number of brokers in the cluster. Minimum value:
1. Maximum value:3. Default value:- For a cluster with one or two brokers:
1. - For a cluster with three or more brokers:
3.
- For a cluster with one or two brokers:
-
- Under Topic settings, specify the topic settings.
- Click Create.
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also set a different folder for any specific command using the --folder-name or --folder-id parameter.
To create a topic:
-
View the description of the CLI command to create topics:
yc managed-kafka topic create --help -
Create a topic:
yc managed-kafka topic create <topic_name> \ --cluster-name <cluster_name> \ --partitions <number_of_partitions> \ --replication-factor <replication_factor>If necessary, specify the topic settings here.
Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_. You cannot create such a topic using the Yandex Cloud interfaces.
-
Open the current Terraform configuration file that defines your infrastructure.
For more information about creating this file, see Creating clusters.
-
Add the
yandex_mdb_kafka_topicresource and configure the topic undertopic_configif required:resource "yandex_mdb_kafka_topic" "<topic_name>" { cluster_id = "<cluster_ID>" name = "<topic_name>" partitions = <number_of_partitions> replication_factor = <replication_factor> topic_config { compression_type = "<compression_type>" flush_messages = <maximum_number_of_messages_in_memory> ... } }Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_. You cannot create such a topic using the Yandex Cloud interfaces. -
Make sure the settings are correct.
-
In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
-
Run this command:
terraform validateTerraform will show any errors found in your configuration files.
-
-
Confirm updating the resources.
-
Run this command to view the planned changes:
terraform planIf you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply -
Confirm updating the resources.
-
Wait for the operation to complete.
-
-
For more information, see the Terraform provider documentation.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>" -
Use the Topic.create method and send the following request, e.g., via cURL
:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --header "Content-Type: application/json" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics' \ --data '{ "topicSpec": { "name": "<topic_name>", "partitions": "<number_of_partitions>", "replicationFactor": "<replication_factor>" }'Where:
-
topicSpecstands for topic settings:-
name: Topic name.Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_. You cannot create such a topic using the Yandex Cloud interfaces. -
partitions: Number of partitions. -
replicationFactor: Replication factor.
-
You can request the cluster ID with the list of clusters in the folder.
-
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume the repository contents are stored in the
~/cloudapi/directory. -
Use the TopicService/Create call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_spec": { "name": "<topic_name>", "partitions": { "value": "<number_of_partitions>" }, "replication_factor": { "value": "<replication_factor>" } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.CreateWhere:
-
topic_specstands for topic settings:-
name: Topic name.Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_. You cannot create such a topic using the Yandex Cloud interfaces. -
partitions: Number of partitions, provided as an object with a field namedvalue. -
replication_factor: Replication factor. Provided as an object with thevaluefield.
-
You can request the cluster ID with the list of clusters in the folder.
-
-
View the server response to make sure the request was successful.
Note
While running, Managed Service for Apache Kafka® is able to create service topics. You cannot write user data to such topics.
Updating topic settings
You cannot reduce the number of partitions in Managed Service for Apache Kafka® topics. You cannot create new partitions if there is not enough storage space.
For more information, see Minimum storage size.
- In the management console
, go to the relevant folder. - In the list of services, select Managed Service for Kafka.
- Click the name of the cluster you need and select the Topics tab.
- Click
for the topic you need and select Edit. - Change the basic parameters of the topic:
- Number of topic partitions.
- Replication factor. This parameter value should not exceed the number of brokers in the cluster. Minimum value:
1. Maximum value:3. Default:- For a cluster with one or two brokers:
1. - For a cluster with three or more brokers:
3.
- For a cluster with one or two brokers:
- Change additional topic settings.
- Click Save.
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also set a different folder for any specific command using the --folder-name or --folder-id parameter.
To update topic settings:
-
View the description of the CLI command to update topics:
yc managed-kafka topic update --help -
Change topic settings:
yc managed-kafka topic update <topic_name> \ --cluster-name <cluster_name> \ --partitions <number_of_partitions> \ --replication-factor <replication_factor>
-
Open the current Terraform configuration file that defines your infrastructure.
For more information about creating this file, see Creating clusters.
-
Edit the parameter values in the
yandex_mdb_kafka_topicresource description:resource "yandex_mdb_kafka_topic" "<topic_name>" { cluster_id = "<cluster_ID>" name = "<topic_name>" partitions = <number_of_partitions> replication_factor = <replication_factor> topic_config { compression_type = "<compression_type>" flush_messages = <maximum_number_of_messages_in_memory> ... } } -
Make sure the settings are correct.
-
In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
-
Run this command:
terraform validateTerraform will show any errors found in your configuration files.
-
-
Confirm updating the resources.
-
Run this command to view the planned changes:
terraform planIf you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply -
Confirm updating the resources.
-
Wait for the operation to complete.
-
-
For more information, see the Terraform provider documentation.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>" -
Use the Topic.update method and send the following request, e.g., via cURL
:Warning
The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the
updateMaskparameter as a single comma-separated string.curl \ --request PATCH \ --header "Authorization: Bearer $IAM_TOKEN" \ --header "Content-Type: application/json" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics/<topic_name>' \ --data '{ "clusterId": "<cluster_ID>", "updateMask": "topicSpec.partitions,topicSpec.replicationFactor,topicSpec.topicConfig_2_8.<setting_1>,...,topicSpec.topicConfig_2_8.<setting_N>,topicSpec.topicConfig_3.<setting_1>,...,topicSpec.topicConfig_3.<setting_N>", "topicSpec": { "partitions": "<number_of_partitions>", "replicationFactor": "<replication_factor>", "topicConfig_3": { "<setting_1_for_Apache Kafka®_3.x_topic>": "<value_1>", "<setting_2_for_Apache Kafka®_3.x_topic>": "<value_2>", ... "<setting_N_for_Apache Kafka®_3.x_topic>": "<value_N>" } } }'Where:
-
updateMask: List of parameters to update as a single string, separated by commas.In this case, list all the topic settings to update.
-
topicSpecstands for new topic settings:-
partitions: Number of partitions. -
replicationFactor: Replication factor. -
topicConfig_3: Topic settings for Apache Kafka®3.x. Use a separate line for each setting; separate them by commas.See Settings for individual topics for a description and possible values for each setting.
-
You can request the cluster ID with the list of clusters in the folder.
-
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume the repository contents are stored in the
~/cloudapi/directory. -
Use the TopicService/Update call and send the following request, e.g., via gRPCurl
:Warning
The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the
update_maskparameter as an array ofpaths[]strings.Format for listing settings
"update_mask": { "paths": [ "<setting_1>", "<setting_2>", ... "<setting_N>" ] }grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_name": "<topic_name>", "update_mask": { "paths": [ "topic_spec.partitions", "topic_spec.replication_factor", "topic_spec.topic_config_2_8.<setting_1>", ..., "topic_spec.topic_config_2_8.<setting_N>", "topic_spec.topic_config_3.<setting_1>", ..., "topic_spec.topic_config_3.<setting_N>" ] }, "topic_spec": { "partitions": { "value": "<number_of_partitions>" }, "replication_factor": { "value": "<replication_factor>" }, "topic_config_3": { "<setting_1_for_Apache Kafka®_3.x_topic>": "<value_1>", "<setting_2_for_Apache Kafka®_3.x_topic>": "<value_2>", ... "<setting_N_for_Apache Kafka®_3.x_topic>": "<value_N>" } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.UpdateWhere:
-
update_mask: List of parameters to update as an array ofpaths[]strings.In this case, list all the topic settings to update.
-
topic_specstands for new topic settings:-
partitions: Number of partitions, provided as an object with a field namedvalue. -
replication_factor: Replication factor. Provided as an object with a field namedvalue. -
topic_config_3: Topic settings for Apache Kafka®3.x. Use a separate line for each setting; separate them by commas.See Settings for individual topics for a description and possible values for each setting.
-
You can request the cluster ID with the list of clusters in the folder.
-
-
View the server response to make sure the request was successful.
Getting a list of topics in a cluster
- In the management console
, go to the relevant folder. - In the list of services, select Managed Service for Kafka.
- Click the cluster name and go to the Topics tab.
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also set a different folder for any specific command using the --folder-name or --folder-id parameter.
To get a list of topics, run the following command:
yc managed-kafka topic list --cluster-name <cluster_name>
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>" -
Use the Topic.list method and send the following request, e.g., via cURL
:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics'You can request the cluster ID with the list of clusters in the folder.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume the repository contents are stored in the
~/cloudapi/directory. -
Use the TopicService/List call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.ListYou can request the cluster ID with the list of clusters in the folder.
-
View the server response to make sure the request was successful.
Getting detailed information about a topic
- In the management console
, go to the relevant folder. - In the list of services, select Managed Service for Kafka.
- Click the cluster name and go to the Topics tab.
- Click the topic name.
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also set a different folder for any specific command using the --folder-name or --folder-id parameter.
To get detailed information about a topic, run the following command:
yc managed-kafka topic get <topic_name> --cluster-name <cluster_name>
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>" -
Use the Topic.list method and send the following request, e.g., via cURL
:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics/<topic_name>'You can request the cluster ID with the list of clusters in the folder and the topic name with the list of topics in the cluster.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume the repository contents are stored in the
~/cloudapi/directory. -
Use the TopicService/Get call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_name": "<topic_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.GetYou can request the cluster ID with the list of clusters in the folder and the topic name with the list of topics in the cluster.
-
View the server response to make sure the request was successful.
Importing topics to Terraform
Using import, you can bring the existing cluster topics under Terraform management.
-
In the Terraform configuration file, specify the topic you want to import:
resource "yandex_mdb_kafka_topic" "<topic_name>" {} -
Run the following command to import the topic:
terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name>To learn more about importing topics, see the Terraform provider documentation.
Transferring information about created topics to the Terraform state file
When switching to a new Terraform provider version, there may be discrepancies between the state file and configuration file in terms of the created topics: the obsolete topic attributes and new yandex_mdb_kafka_topic resources. To remove the discrepancies, delete the topic attributes and transfer information about the created yandex_mdb_kafka_topic resources to the .tfstate state file. There are two possible ways to do this.
First method
-
Delete the cluster information from the
.tfstatefile using this command:terraform state rm yandex_mdb_kafka_cluster.<cluster_name> -
Edit the Terraform configuration file:
- Delete the
topicattributes from theyandex_mdb_kafka_clusterresource. - Add new
yandex_mdb_kafka_topicresources.
Example of the updated configuration file
resource "yandex_mdb_kafka_cluster" "this" { name = "terraform-test" environment = "PRODUCTION" network_id = data.yandex_vpc_network.this.id config { version = "3.4" brokers_count = 1 zones = ["ru-central1-a"] kafka { resources { resource_preset_id = "s2.small" disk_size = 30 disk_type_id = "network-ssd" } kafka_config { log_segment_bytes = 104857600 } } } } resource "yandex_mdb_kafka_topic" "topic1" { cluster_id = yandex_mdb_kafka_cluster.this.id name = "topic1" partitions = 3 replication_factor = 1 } resource "yandex_mdb_kafka_topic" "topic2" { cluster_id = yandex_mdb_kafka_cluster.this.id name = "topic2" partitions = 3 replication_factor = 1 } - Delete the
-
Import the cluster and topics:
terraform import yandex_mdb_kafka_cluster.<cluster_name> <cluster_ID> terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name> terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name> -
Check the result using this command:
terraform planIf the state file matches the configuration, the terminal displays this message:
No changes. Infrastructure is up-to-date.Terraform will inform you if the state file does not match the configuration.
Second method
-
Download the
.tfstatefile using this command:terraform state pull -
Open the downloaded file in any text editor and delete the
topicattributes from theyandex_mdb_kafka_clusterresource. -
Push the updated state file using this command:
terraform state push -
Edit the Terraform configuration file:
- Delete the
topicattributes from theyandex_mdb_kafka_clusterresource. - Add new
yandex_mdb_kafka_topicresources.
Example of the updated configuration file
resource "yandex_mdb_kafka_cluster" "this" { name = "terraform-test" environment = "PRODUCTION" network_id = data.yandex_vpc_network.this.id config { version = "3.4" brokers_count = 1 zones = ["ru-central1-a"] kafka { resources { resource_preset_id = "s2.small" disk_size = 30 disk_type_id = "network-ssd" } kafka_config { log_segment_bytes = 104857600 } } } } resource "yandex_mdb_kafka_topic" "topic1" { cluster_id = yandex_mdb_kafka_cluster.this.id name = "topic1" partitions = 3 replication_factor = 1 } resource "yandex_mdb_kafka_topic" "topic2" { cluster_id = yandex_mdb_kafka_cluster.this.id name = "topic2" partitions = 3 replication_factor = 1 } - Delete the
-
Import the topics:
terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name> terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name> -
Check the result using this command:
terraform planIf the state file matches the configuration, the terminal displays this message:
No changes. Infrastructure is up-to-date.Terraform will inform you if the state file does not match the configuration.
Deleting a topic
Note
Permissions granted to the user for a topic remain even after the topic is deleted. If, after deleting a topic, you don't revoke the permissions, then, when you create a topic with the same name, the user will have access to it even if you don't explicitly assign them new permissions.
- In the management console
, go to the relevant folder. - In the list of services, select Managed Service for Kafka.
- Click the cluster name and go to the Topics tab.
- Click
for the topic and select Delete topic. - In the window that opens, click Delete.
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also set a different folder for any specific command using the --folder-name or --folder-id parameter.
To delete a topic:
-
View the description of the CLI command to update topics:
yc managed-kafka topic delete --help -
Delete a topic:
yc managed-kafka topic delete <topic_name> --cluster-name <cluster_name>
-
Open the current Terraform configuration file that defines your infrastructure.
For more information about creating this file, see Creating clusters.
-
Delete the
yandex_mdb_kafka_topicresource with the topic description. -
Make sure the settings are correct.
-
In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
-
Run this command:
terraform validateTerraform will show any errors found in your configuration files.
-
-
Confirm updating the resources.
-
Run this command to view the planned changes:
terraform planIf you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply -
Confirm updating the resources.
-
Wait for the operation to complete.
-
-
For more information, see the Terraform provider documentation.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>" -
Use the Topic.delete method and send the following request, e.g., via cURL
:curl \ --request DELETE \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics/<topic_name>'You can request the cluster ID with the list of clusters in the folder and the topic name with the list of topics in the cluster.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume the repository contents are stored in the
~/cloudapi/directory. -
Use the TopicService/Delete call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_name": "<topic_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.DeleteYou can request the cluster ID with the list of clusters in the folder and the topic name with the list of topics in the cluster.
-
View the server response to make sure the request was successful.
Managing topics via the Apache Kafka® Admin API
To manage topics via the Apache Kafka® Admin API:
- Create in the cluster an admin user with either the
ACCESS_ROLE_ADMINorACCESS_ROLE_TOPIC_ADMINrole. Learn more about the permissions you get with each role. - Manage topics on behalf of this user by making requests to the Apache Kafka® Admin API. Review your favorite programming language manual for information on working with the Admin API.
For more information about working with the Admin API and the existing limitations, see Managing topics and partitions and the Apache Kafka® documentation