Managing Apache Kafka® topics
A Managed Service for Apache Kafka® cluster provides two ways for you to manage topics and partitions (which can be used separately or combined):
-
Using native Yandex Cloud interfaces, such as the CLI, API, or management console. Choose this method if you want to manage topics using Managed Service for Apache Kafka® features.
You can perform the following actions on Managed Service for Apache Kafka® topics:
-
Using the Apache Kafka® Admin API. Select this method if you prefer to use your existing solution to manage topics and partitions.
Managing topics via Yandex Cloud interfaces
Creating a topic
Prior to creating a topic, calculate the minimum storage size.
- In the management console
, go to the relevant folder. - From the list of services, select Managed Service for Kafka.
- Click the cluster name and go to the Topics tab.
- Click Create topic.
- Under Basic parameters, set the basic parameters of the topic:
-
Topic name (must be unique in the Apache Kafka® cluster).
Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_
. You cannot create such a topic using the Yandex Cloud interfaces. -
Number of topic partitions.
-
Replication factor. This parameter value should not exceed the number of brokers in the cluster. Minimum value:
1
. Maximum value:3
. Default value:- For a cluster with one or two brokers:
1
. - For a cluster with three or more brokers:
3
.
- For a cluster with one or two brokers:
-
- Under Topic settings, specify the topic settings.
- Click Create.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder specified when creating the CLI profile is used by default. To change the default folder, use the yc config set folder-id <folder_ID>
command. You can specify a different folder using the --folder-name
or --folder-id
parameter.
To create a topic:
-
View a description of the CLI create topic command:
yc managed-kafka topic create --help
-
Create a topic:
yc managed-kafka topic create <topic_name> \ --cluster-name <cluster_name> \ --partitions <number_of_partitions> \ --replication-factor <replication_factor>
If necessary, specify the topic settings here.
Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_
. You cannot create such a topic using the Yandex Cloud interfaces.
-
Open the current Terraform configuration file that defines your infrastructure.
For more information about creating this file, see Creating clusters.
-
Add the
yandex_mdb_kafka_topic
resource and configure the topic undertopic_config
if required:resource "yandex_mdb_kafka_topic" "<topic_name>" { cluster_id = "<cluster_ID>" name = "<topic_name>" partitions = <number_of_partitions> replication_factor = <replication_factor> topic_config { compression_type = "<compression_type>" flush_messages = <maximum_number_of_messages_in_memory> ... } }
Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_
. You cannot create such a topic using the Yandex Cloud interfaces. -
Make sure the settings are correct.
-
In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
-
Run this command:
terraform validate
Terraform will show any errors found in your configuration files.
-
-
Confirm updating the resources.
-
Run this command to view the planned changes:
terraform plan
If you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply
-
Confirm updating the resources.
-
Wait for the operation to complete.
-
-
For more information, see the Terraform
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Topic.create method and send the following request, e.g., via cURL
:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --header "Content-Type: application/json" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics' \ --data '{ "topicSpec": { "name": "<topic_name>", "partitions": "<number_of_partitions>", "replicationFactor": "<replication_factor>" }'
Where:
-
topicSpec
stands for topic settings:-
name
: Topic name.Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_
. You cannot create such a topic using the Yandex Cloud interfaces. -
partitions
: Number of partitions. -
replicationFactor
: Replication factor.
-
You can request the cluster ID with the list of clusters in the folder.
-
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the TopicService/Create call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_spec": { "name": "<topic_name>", "partitions": { "value": "<number_of_partitions>" }, "replication_factor": { "value": "<replication_factor>" } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.Create
Where:
-
topic_spec
stands for topic settings:-
name
: Topic name.Note
Use the Apache Kafka® Admin API if you need to create a topic that starts with
_
. You cannot create such a topic using the Yandex Cloud interfaces. -
partitions
: Number of partitions, provided as an object with a field namedvalue
. -
replication_factor
: Replication factor. Provided as an object with thevalue
field.
-
You can request the cluster ID with the list of clusters in the folder.
-
-
View the server response to make sure the request was successful.
Note
While running, Managed Service for Apache Kafka® is able to create service topics. You cannot write user data to such topics.
Updating topic settings
You cannot reduce the number of partitions in Managed Service for Apache Kafka® topics. You cannot create new partitions if there is not enough storage space.
For more information, see Minimum storage size.
- In the management console
, go to the relevant folder. - From the list of services, select Managed Service for Kafka.
- Click the name of the cluster you need and select the Topics tab.
- Click
for the topic you need and select Edit. - Change the basic parameters of the topic:
- Number of topic partitions.
- Replication factor. This parameter value should not exceed the number of brokers in the cluster. Minimum value:
1
. Maximum value:3
. Default:- For a cluster with one or two brokers:
1
. - For a cluster with three or more brokers:
3
.
- For a cluster with one or two brokers:
- Change additional topic settings.
- Click Save.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder specified when creating the CLI profile is used by default. To change the default folder, use the yc config set folder-id <folder_ID>
command. You can specify a different folder using the --folder-name
or --folder-id
parameter.
To update topic settings:
-
View a description of the CLI update topic command:
yc managed-kafka topic update --help
-
Change topic settings:
yc managed-kafka topic update <topic_name> \ --cluster-name <cluster_name> \ --partitions <number_of_partitions> \ --replication-factor <replication_factor>
-
Open the current Terraform configuration file that defines your infrastructure.
For more information about creating this file, see Creating clusters.
-
Edit the parameter values in the
yandex_mdb_kafka_topic
resource description:resource "yandex_mdb_kafka_topic" "<topic_name>" { cluster_id = "<cluster_ID>" name = "<topic_name>" partitions = <number_of_partitions> replication_factor = <replication_factor> topic_config { compression_type = "<compression_type>" flush_messages = <maximum_number_of_messages_in_memory> ... } }
-
Make sure the settings are correct.
-
In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
-
Run this command:
terraform validate
Terraform will show any errors found in your configuration files.
-
-
Confirm updating the resources.
-
Run this command to view the planned changes:
terraform plan
If you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply
-
Confirm updating the resources.
-
Wait for the operation to complete.
-
-
For more information, see the Terraform provider documentation
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Topic.update method and send the following request, e.g., via cURL
:Warning
The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the
updateMask
parameter as a single comma-separated string.curl \ --request PATCH \ --header "Authorization: Bearer $IAM_TOKEN" \ --header "Content-Type: application/json" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics/<topic_name>' \ --data '{ "clusterId": "<cluster_ID>", "updateMask": "topicSpec.partitions,topicSpec.replicationFactor,topicSpec.topicConfig_2_8.<setting_1>,...,topicSpec.topicConfig_2_8.<setting_N>,topicSpec.topicConfig_3.<setting_1>,...,topicSpec.topicConfig_3.<setting_N>", "topicSpec": { "partitions": "<number_of_partitions>", "replicationFactor": "<replication_factor>", "topicConfig_3": { "<setting_1_for_Apache Kafka®_3.x_topic>": "<value_1>", "<setting_2_for_Apache Kafka®_3.x_topic>": "<value_2>", ... "<setting_N_for_Apache Kafka®_3.x_topic>": "<value_N>" } } }'
Where:
-
updateMask
: List of parameters to update as a single string, separated by commas.In this case, list all the topic settings to update.
-
topicSpec
stands for new topic settings:-
partitions
: Number of partitions. -
replicationFactor
: Replication factor. -
topicConfig_3
: Topic settings for Apache Kafka®3.x
. Use a separate line for each setting; separate them by commas.See Settings for individual topics for a description and possible values for each setting.
-
You can request the cluster ID with the list of clusters in the folder.
-
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the TopicService/Update call and send the following request, e.g., via gRPCurl
:Warning
The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the
update_mask
parameter as an array ofpaths[]
strings.Format for listing settings
"update_mask": { "paths": [ "<setting_1>", "<setting_2>", ... "<setting_N>" ] }
grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_name": "<topic_name>", "update_mask": { "paths": [ "topic_spec.partitions", "topic_spec.replication_factor", "topic_spec.topic_config_2_8.<setting_1>", ..., "topic_spec.topic_config_2_8.<setting_N>", "topic_spec.topic_config_3.<setting_1>", ..., "topic_spec.topic_config_3.<setting_N>" ] }, "topic_spec": { "partitions": { "value": "<number_of_partitions>" }, "replication_factor": { "value": "<replication_factor>" }, "topic_config_3": { "<setting_1_for_Apache Kafka®_3.x_topic>": "<value_1>", "<setting_2_for_Apache Kafka®_3.x_topic>": "<value_2>", ... "<setting_N_for_Apache Kafka®_3.x_topic>": "<value_N>" } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.Update
Where:
-
update_mask
: List of parameters to update as an array ofpaths[]
strings.In this case, list all the topic settings to update.
-
topic_spec
stands for new topic settings:-
partitions
: Number of partitions, provided as an object with a field namedvalue
. -
replication_factor
: Replication factor. Provided as an object with a field namedvalue
. -
topic_config_3
: Topic settings for Apache Kafka®3.x
. Use a separate line for each setting; separate them by commas.See Settings for individual topics for a description and possible values for each setting.
-
You can request the cluster ID with the list of clusters in the folder.
-
-
View the server response to make sure the request was successful.
Getting a list of topics in a cluster
- In the management console
, go to the relevant folder. - From the list of services, select Managed Service for Kafka.
- Click the cluster name and go to the Topics tab.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder specified when creating the CLI profile is used by default. To change the default folder, use the yc config set folder-id <folder_ID>
command. You can specify a different folder using the --folder-name
or --folder-id
parameter.
To get a list of topics, run the following command:
yc managed-kafka topic list --cluster-name <cluster_name>
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Topic.list method and send the following request, e.g., via cURL
:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics'
You can request the cluster ID with the list of clusters in the folder.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the TopicService/List call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.List
You can request the cluster ID with the list of clusters in the folder.
-
View the server response to make sure the request was successful.
Getting detailed information about a topic
- In the management console
, go to the relevant folder. - From the list of services, select Managed Service for Kafka.
- Click the cluster name and go to the Topics tab.
- Click the topic name.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder specified when creating the CLI profile is used by default. To change the default folder, use the yc config set folder-id <folder_ID>
command. You can specify a different folder using the --folder-name
or --folder-id
parameter.
To get detailed information about a topic, run the following command:
yc managed-kafka topic get <topic_name> --cluster-name <cluster_name>
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Topic.list method and send the following request, e.g., via cURL
:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics/<topic_name>'
You can request the cluster ID with the list of clusters in the folder and the topic name with the list of topics in the cluster.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the TopicService/Get call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_name": "<topic_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.Get
You can request the cluster ID with the list of clusters in the folder and the topic name with the list of topics in the cluster.
-
View the server response to make sure the request was successful.
Importing topics to Terraform
Using import, you can bring the existing cluster topics under Terraform management.
-
In the Terraform configuration file, specify the topic you want to import:
resource "yandex_mdb_kafka_topic" "<topic_name>" {}
-
Run the following command to import the topic:
terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name>
To learn more about importing topics, see the Terraform provider documentation
.
Transferring information about created topics to the Terraform state file
When switching to a new Terraform provider version, there may be discrepancies between the state file and configuration file in terms of the created topics: the obsolete topic
attributes and new yandex_mdb_kafka_topic
resources. To remove the discrepancies, delete the topic
attributes and transfer information about the created yandex_mdb_kafka_topic
resources to the .tfstate
state file. There are two possible ways to do this.
First method
-
Delete the cluster information from the
.tfstate
file using this command:terraform state rm yandex_mdb_kafka_cluster.<cluster_name>
-
Edit the Terraform configuration file:
- Delete the
topic
attributes from theyandex_mdb_kafka_cluster
resource. - Add
newyandex_mdb_kafka_topic
resources.
Example of the updated configuration file
resource "yandex_mdb_kafka_cluster" "this" { name = "terraform-test" environment = "PRODUCTION" network_id = data.yandex_vpc_network.this.id config { version = "3.4" brokers_count = 1 zones = ["ru-central1-a"] kafka { resources { resource_preset_id = "s2.small" disk_size = 30 disk_type_id = "network-ssd" } kafka_config { log_segment_bytes = 104857600 } } } } resource "yandex_mdb_kafka_topic" "topic1" { cluster_id = yandex_mdb_kafka_cluster.this.id name = "topic1" partitions = 3 replication_factor = 1 } resource "yandex_mdb_kafka_topic" "topic2" { cluster_id = yandex_mdb_kafka_cluster.this.id name = "topic2" partitions = 3 replication_factor = 1 }
- Delete the
-
Import the cluster and topics:
terraform import yandex_mdb_kafka_cluster.<cluster_name> <cluster_ID> terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name> terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name>
-
Check the result using this command:
terraform plan
If the state file matches the configuration, the terminal displays this message:
No changes. Infrastructure is up-to-date.
Terraform will inform you if the state file does not match the configuration.
Second method
-
Download the
.tfstate
file using this command:terraform state pull
-
Open the downloaded file in any text editor and delete the
topic
attributes from theyandex_mdb_kafka_cluster
resource. -
Push the updated state file using this command:
terraform state push
-
Edit the Terraform configuration file:
- Delete the
topic
attributes from theyandex_mdb_kafka_cluster
resource. - Add
newyandex_mdb_kafka_topic
resources.
Example of the updated configuration file
resource "yandex_mdb_kafka_cluster" "this" { name = "terraform-test" environment = "PRODUCTION" network_id = data.yandex_vpc_network.this.id config { version = "3.4" brokers_count = 1 zones = ["ru-central1-a"] kafka { resources { resource_preset_id = "s2.small" disk_size = 30 disk_type_id = "network-ssd" } kafka_config { log_segment_bytes = 104857600 } } } } resource "yandex_mdb_kafka_topic" "topic1" { cluster_id = yandex_mdb_kafka_cluster.this.id name = "topic1" partitions = 3 replication_factor = 1 } resource "yandex_mdb_kafka_topic" "topic2" { cluster_id = yandex_mdb_kafka_cluster.this.id name = "topic2" partitions = 3 replication_factor = 1 }
- Delete the
-
Import the topics:
terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name> terraform import yandex_mdb_kafka_topic.<topic_name> <cluster_ID>:<topic_name>
-
Check the result using this command:
terraform plan
If the state file matches the configuration, the terminal displays this message:
No changes. Infrastructure is up-to-date.
Terraform will inform you if the state file does not match the configuration.
Deleting a topic
Note
Permissions granted to the user for a topic remain even after the topic is deleted. If, after deleting a topic, you don't revoke the permissions, then, when you create a topic with the same name, the user will have access to it even if you don't explicitly assign them new permissions.
- In the management console
, go to the relevant folder. - From the list of services, select Managed Service for Kafka.
- Click the cluster name and go to the Topics tab.
- Click
for the topic and select Delete topic. - In the window that opens, click Delete.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder specified when creating the CLI profile is used by default. To change the default folder, use the yc config set folder-id <folder_ID>
command. You can specify a different folder using the --folder-name
or --folder-id
parameter.
To delete a topic:
-
View a description of the CLI update topic command:
yc managed-kafka topic delete --help
-
Delete a topic:
yc managed-kafka topic delete <topic_name> --cluster-name <cluster_name>
-
Open the current Terraform configuration file that defines your infrastructure.
For more information about creating this file, see Creating clusters.
-
Delete the
yandex_mdb_kafka_topic
resource with the topic description. -
Make sure the settings are correct.
-
In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
-
Run this command:
terraform validate
Terraform will show any errors found in your configuration files.
-
-
Confirm updating the resources.
-
Run this command to view the planned changes:
terraform plan
If you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply
-
Confirm updating the resources.
-
Wait for the operation to complete.
-
-
For more information, see the Terraform
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Topic.delete method and send the following request, e.g., via cURL
:curl \ --request DELETE \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/topics/<topic_name>'
You can request the cluster ID with the list of clusters in the folder and the topic name with the list of topics in the cluster.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the TopicService/Delete call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/topic_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "topic_name": "<topic_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.TopicService.Delete
You can request the cluster ID with the list of clusters in the folder and the topic name with the list of topics in the cluster.
-
View the server response to make sure the request was successful.
Managing topics via the Apache Kafka® Admin API
To manage topics via the Apache Kafka® Admin API:
- Create an admin user with the
ACCESS_ROLE_ADMIN
role in the cluster. - Manage topics on behalf of this user by making requests to the Apache Kafka® Admin API. Review your favorite programming language manual for information on working with the Admin API.
For more information about working with the Admin API and the existing limitations, see Managing topics and partitions and the Apache Kafka® documentation