Managing connectors
Connectors manage the transfer of Apache Kafka® topics to a different cluster or data storage system.
You can:
- Get a list of connectors.
- Get detailed information about a connector.
- Create a connector of the right type:
- Edit a connector.
- Pause a connector.
- Resume a connector.
- Import a connector to Terraform.
- Delete a connector.
Getting a list of connectors
- In the management console
, navigate to the relevant folder. - Navigate to Managed Service for Kafka.
- Select the cluster and open the Connectors tab.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder used by default is the one specified when creating the CLI profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also specify a different folder for any command using --folder-name or --folder-id. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.
To get the list of cluster connectors, run this command:
yc managed-kafka connector list --cluster-name=<cluster_name>
Result:
+--------------+-----------+
| NAME | TASKS MAX |
+--------------+-----------+
| connector559 | 1 |
| ... | |
+--------------+-----------+
You can get the cluster name with the list of clusters in the folder.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Call the Connector.list method, e.g., via the following cURL
request:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors'You can get the cluster ID with the list of clusters in the folder.
-
Check the server response to make sure your request was successful.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume that the repository contents reside in the
~/cloudapi/directory. -
Call the ConnectorService/List method, e.g., via the following gRPCurl
request:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.ListYou can get the cluster ID with the list of clusters in the folder.
-
Check the server response to make sure your request was successful.
Getting detailed information about a connector
- In the management console
, navigate to the relevant folder. - Navigate to Managed Service for Kafka.
- Select the cluster and open the Connectors tab.
- Click the connector name.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder used by default is the one specified when creating the CLI profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also specify a different folder for any command using --folder-name or --folder-id. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.
To get detailed information about a connector, run this command:
yc managed-kafka connector get <connector_name>\
--cluster-name=<cluster_name>
Result:
name: connector785
tasks_max: "1"
cluster_id: c9qbkmoiimsl********
...
You can get the connector name with the list of cluster connectors, and the cluster name, with the list of clusters in the folder.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Call the Connector.get method, e.g., via the following cURL
request:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/<connector_name>'You can get the cluster ID with the list of clusters in the folder, and the connector name, with the list of connectors in the cluster.
-
Check the server response to make sure your request was successful.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume that the repository contents reside in the
~/cloudapi/directory. -
Call the ConnectorService/Get method, e.g., via the following gRPCurl
request:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.GetYou can get the cluster ID with the list of clusters in the folder.
-
Check the server response to make sure your request was successful.
Creating a connector
-
In the management console
, navigate to the relevant folder. -
Navigate to Managed Service for Kafka.
-
Select the cluster and open the Connectors tab.
-
Click Create connector.
-
Under Basic parameters, specify:
- Connector name.
- Task limit: Number of concurrent tasks. To distribute replication load evenly, we recommend a value of at least
2.
-
Under Additional properties, specify the connector properties in the following format:
<key>:<value>The key can either be a simple string or include a prefix that indicates whether it belongs to the source or target (a cluster alias in the connector configuration):
<cluster_alias>.<key_body>:<value> -
Select the connector type, MirrorMaker, S3 Sink, or Iceberg Sink, and set up its configuration.
For more information about the supported connector types, see Connectors.
-
Click Create.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder used by default is the one specified when creating the CLI profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also specify a different folder for any command using --folder-name or --folder-id. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.
To create a MirrorMaker connector:
-
See the description of the CLI command for creating a connector:
yc managed-kafka connector-mirrormaker create --help -
Create a connector:
yc managed-kafka connector-mirrormaker create <connector_name> \ --cluster-name=<cluster_name> \ --direction=<connector_direction> \ --tasks-max=<task_limit> \ --properties=<advanced_properties> \ --replication-factor=<replication_factor> \ --topics=<topic_pattern> \ --this-cluster-alias=<this_cluster_prefix> \ --external-cluster alias=<external_cluster_prefix>,` `bootstrap-servers=<list_of_broker_host_FQDNs>,` `security-protocol=<security_protocol>,` `sasl-mechanism=<authentication_mechanism>,` `sasl-username=<username>,` `sasl-password=<user_password>,` `ssl-truststore-certificates=<certificates_in_PEM_format>To learn how to get a broker host FQDN, see this guide.
You can get the cluster name with the list of clusters in the folder.
--directiontakes these values:egress: If the current cluster is a source cluster.ingress: If the current cluster is a target cluster.
To create an S3 Sink connector:
-
View the description of the CLI command to create a connector:
yc managed-kafka connector-s3-sink create --help -
Create a connector:
yc managed-kafka connector-s3-sink create <connector_name> \ --cluster-name=<cluster_name> \ --tasks-max=<task_limit> \ --properties=<advanced_properties> \ --topics=<topic_pattern> \ --file-compression-type=<compression_codec> \ --file-max-records=<file_max_records> \ --bucket-name=<bucket_name> \ --access-key-id=<AWS_compatible_static_key_ID> \ --secret-access-key=<AWS_compatible_static_key_contents> \ --storage-endpoint=<S3_compatible_storage_endpoint> \ --region=<S3_compatible_storage_region>You can get the cluster name with the list of clusters in the folder.
To create an Iceberg Sink connector:
-
See the description of the CLI command for creating a connector:
yc managed-kafka connector-iceberg-sink create --help -
Create a connector:
yc managed-kafka connector-iceberg-sink create <connector_name> \ --cluster-name=<cluster_name> \ --tasks-max=<task_limit> \ --properties=<advanced_properties> \ --topics=<topic_pattern> \ --file-compression-type=<compression_codec> \ --file-max-records=<file_max_records> \ --bucket-name=<bucket_name> \ --access-key-id=<AWS_compatible_static_key_ID> \ --secret-access-key=<AWS_compatible_static_key_contents> \ --storage-endpoint=<S3_compatible_storage_endpoint> \ --region=<S3_compatible_storage_region>You can get the cluster name with the list of clusters in the folder.
-
Check the list of MirrorMaker and S3 Sink connector settings.
-
Open the current Terraform configuration file describing your infrastructure.
For information about creating this file, see Creating a cluster Apache Kafka®.
-
To create a MirrorMaker connector, add the
yandex_mdb_kafka_connectorresource with theconnector_config_mirrormakerconfiguration section:resource "yandex_mdb_kafka_connector" "<connector_name>" { cluster_id = "<cluster_ID>" name = "<connector_name>" tasks_max = <task_limit> properties = { <advanced_properties> } connector_config_mirrormaker { topics = "<topic_pattern>" replication_factor = <replication_factor> source_cluster { alias = "<cluster_prefix>" external_cluster { bootstrap_servers = "<list_of_broker_host_FQDNs>" sasl_username = "<username>" sasl_password = "<user_password>" sasl_mechanism = "<authentication_mechanism>" security_protocol = "<security_protocol>" ssl-truststore-certificates = "<PEM_certificate_contents>" } } target_cluster { alias = "<cluster_prefix>" this_cluster {} } } }To learn how to get a broker host FQDN, see this guide.
-
To create an S3 Sink connector, add the
yandex_mdb_kafka_connectorresource with theconnector_config_s3_sinkconfiguration section:resource "yandex_mdb_kafka_connector" "<connector_name>" { cluster_id = "<cluster_ID>" name = "<connector_name>" tasks_max = <task_limit> properties = { <advanced_properties> } connector_config_s3_sink { topics = "<topic_pattern>" file_compression_type = "<compression_codec>" file_max_records = <file_max_records> s3_connection { bucket_name = "<bucket_name>" external_s3 { endpoint = "<S3_compatible_storage_endpoint>" access_key_id = "<AWS_compatible_static_key_ID>" secret_access_key = "<AWS_compatible_static_key_contents>" } } } } -
To create an Iceberg Sink connector, add the
yandex_mdb_kafka_connectorresource with theconnector_config_iceberg_sinkconfiguration section:resource "yandex_mdb_kafka_connector" "<connector_name>" { cluster_id = "<cluster_ID>" name = "<connector_name>" tasks_max = <task_limit> properties = { <advanced_properties> } connector_config_iceberg_sink { topics = "<comma_separated_list_of_topics>" control_topic = "<management_topic_name>" metastore_connection { catalog_uri = "<URI_for_connecting_to_Metastore_cluster>" warehouse = "<root_directory_for_storing_managed_table_data_in_S3>" } s3_connection { external_s3 { endpoint = "<S3_compatible_storage_endpoint>" access_key_id = "<AWS_compatible_static_key_ID>" secret_access_key = "<AWS_compatible_static_key_contents>" region = "<region_name>" } } static_tables { tables = "comma_separated_table_names" } tables_config { default_commit_branch = "<default_branch_name>" default_id_columns = "<comma_separated_default_column_list>" default_partition_by = "<list_of_columns_or_transformation_expressions>" evolve_schema_enabled = <automatically_update_Iceberg_table_schema> schema_force_optional = <make_Iceberg_table_schema_fields_optional> schema_case_insensitive = <ignore_case_when_matching_fields> } control_config { group_id_prefix = "<prefix_for_Consumer_Group_ID>" commit_interval_ms = <Iceberg_table_data_commit_interval> commit_timeout_ms = <how_long_the_coordinator_waits_for_confirmation> commit_threads = <number_of_threads_for_committing_data_to_Iceberg_table> transactional_prefix = "<prefix_for_Transactional_ID>" } } } -
Make sure the settings are correct.
-
In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
-
Run this command:
terraform validateTerraform will show any errors found in your configuration files.
-
-
Confirm resource changes.
-
Run this command to view the planned changes:
terraform planIf you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply -
Confirm updating the resources.
-
Wait for the operation to complete.
-
-
For more information, see this Terraform provider guide.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
To create a MirrorMaker connector, call the Connector.create method, e.g., via the following cURL
request:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors' \ --data '{ "connectorSpec": { "name": "<connector_name>", "tasksMax": "<task_limit>" "properties": "<advanced_connector_properties>" "connectorConfigMirrormaker": { <Mirrormaker_connector_settings> } } }'You can get the cluster ID with the list of clusters in the folder.
-
To create an S3 Sink connector, call the Connector.create method, e.g., via the following cURL
request:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors' \ --data '{ "connectorSpec": { "name": "<connector_name>", "tasksMax": "<task_limit>" "properties": "<advanced_connector_properties>" "connectorConfigS3Sink": { <S3_Sink_connector_settings> } } }'You can get the cluster ID with the list of clusters in the folder.
-
Check the server response to make sure your request was successful.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume that the repository contents reside in the
~/cloudapi/directory. -
To create a MirrorMaker connector, call the ConnectorService/Create method, e.g., via the following gRPCurl
request:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "connector_spec": { "name": "<connector_name>", "tasks_max": { "value": "<task_limit>" }, "properties": "<advanced_connector_properties>" "connector_config_mirrormaker": { <Mirrormaker_connector_settings> } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.CreateYou can get the cluster ID with the list of clusters in the folder.
-
To create an S3 Sink connector, call the ConnectorService/Create method, e.g., via the following gRPCurl
request:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "connector_spec": { "name": "<connector_name>", "tasks_max": { "value": "<task_limit>" }, "properties": "<advanced_connector_properties>" "connector_config_s3_sink": { <S3_Sink_connector_settings> } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.CreateYou can get the cluster ID with the list of clusters in the folder.
-
Check the server response to make sure your request was successful.
MirrorMaker
Specify the MirrorMaker connector parameters as follows:
-
Topics: Pattern for selecting topics to replicate. List topic names separated by commas or
|. You can also use a regular expression (.*), e.g.,analysis.*. To replicate all topics, specify.*. -
Replication factor: Number of replicas the cluster stores for each topic.
-
Under Source cluster, specify the parameters for connecting to the source cluster:
-
Alias: Source cluster prefix in the connector settings.
Note
Topics in the target cluster will be created with the specified prefix.
-
Use this cluster: Select this option to use the current cluster as the source.
-
Bootstrap servers: Comma-separated list of the FQDNs of the source cluster broker hosts with the port numbers for connection, e.g.,
broker1.example.com:9091,broker2.example.com.To learn how to get a broker host FQDN, see this guide.
-
SASL username: Username for the connector to access the source cluster.
-
SASL password: User password for the connector to access the source cluster.
-
SASL mechanism: Authentication mechanism for username and password validation.
-
Security protocol: Select the connection protocol for the connector:
PLAINTEXT,SASL_PLAINTEXT: To connect without SSL.SSL,SASL_SSL: To connect with SSL.
-
Certificate in PEM format: Upload a PEM certificate to access the external cluster.
-
-
Under Target cluster, specify the parameters for connecting to the target cluster:
-
Alias: Target cluster prefix in the connector settings.
-
Use this cluster: Select this option to use the current cluster as the target.
-
Bootstrap servers: Comma-separated list of the FQDNs of the target cluster broker hosts with the port numbers for connection.
To learn how to get a broker host FQDN, see this guide.
-
SASL username: Username for the connector to access the target cluster.
-
SASL password: User password for the connector to access the target cluster.
-
SASL mechanism: Authentication mechanism for username and password validation.
-
Security protocol: Select the connection protocol for the connector:
PLAINTEXT,SASL_PLAINTEXT: To connect without SSL.SSL,SASL_SSL: To connect with SSL.
-
Certificate in PEM format: Upload a PEM certificate to access the external cluster.
-
-
To specify additional settings not listed above, create the relevant keys and set their values under Additional properties when creating or updating the connector. Here are some examples of keys:
key.convertervalue.converter
For the list of general connector settings, see this Apache Kafka® guide
.
-
--cluster-name: Cluster name. -
--direction: Connector direction:ingress: For a target cluster.egress: For a source cluster.
-
--tasks-max: Maximum number of concurrently running connector tasks. -
--properties: Comma-separated list of additional connector settings in<key>:<value>format. Here are some examples of keys:key.convertervalue.converter
For the list of general connector settings, see this Apache Kafka® guide
. -
--replication-factor: Number of replicas the cluster stores for each topic. -
--topics: Pattern for selecting topics to replicate. List topic names separated by commas or|. You can also use a regular expression (.*), e.g.,analysis.*. To migrate all topics, specify.*. -
--this-cluster-alias: This cluster prefix in the connector settings. -
--external-cluster: External cluster parameters:-
alias: External cluster prefix in the connector settings. -
bootstrap-servers: Comma-separated list of the FQDNs of the external cluster broker hosts with the port numbers for connection.To learn how to get a broker host FQDN, see this guide.
-
security-protocol: Connection protocol for the connector:plaintext,sasl_plaintext: To connect without SSL.ssl,sasl_ssl: To connect with SSL.
-
sasl-mechanism: Authentication mechanism for username and password validation. -
sasl-username: Username for the connector to access the external cluster. -
sasl-password: User password for the connector to access the external cluster. -
ssl-truststore-certificates: List of PEM certificates.
-
-
properties: Comma-separated list of additional connector settings in
<key>:<value>format. Here are some examples of keys:key.convertervalue.converter
For the list of general connector settings, see this Apache Kafka® guide
. -
connector_config_mirrormaker: MirrorMaker connector settings:
- replication_factor: Number of replicas the cluster stores for each topic.
- topics: Pattern for selecting topics to replicate. List topic names separated by commas or
|. You can also use a regular expression (.*), e.g.,analysis.*. To migrate all topics, specify.*. - source_cluster and target_cluster: Parameters for connecting to the source and target clusters:
-
alias: Cluster prefix in the connector settings.
Note
Topics in the target cluster will be created with the specified prefix.
-
external_cluster: Parameters for connecting to the external cluster:
-
bootstrap_servers: Comma-separated list of the FQDNs of the cluster broker hosts with the port numbers for connection.
To learn how to get a broker host FQDN, see this guide.
-
sasl_username: Username for the connector to access the cluster.
-
sasl_password: User password for the connector to access the cluster.
-
sasl_mechanism: Authentication mechanism for username and password validation.
-
security_protocol: Connection protocol for the connector:
PLAINTEXT,SASL_PLAINTEXT: To connect without SSL.SSL,SASL_SSL: To connect with SSL.
-
ssl_truststore_certificates: PEM certificate contents.
-
-
this_cluster: Option to use the current cluster as the source or target.
-
To configure the MirrorMaker connector, use the connectorSpec.connectorConfigMirrormaker parameter:
-
sourceClusterandtargetCluster: Parameters for connecting to the source and target clusters:-
alias: Cluster prefix in the connector settings.Note
Topics in the target cluster will be created with the specified prefix.
-
thisCluster: Option to use the current cluster as the source or target. -
externalCluster: Parameters for connecting to the external cluster:-
bootstrapServers: Comma-separated list of the FQDNs of the cluster broker hosts with the port numbers for connection.To learn how to get a broker host FQDN, see this guide.
-
saslUsername: Username for the connector to access the cluster. -
saslPassword: User password for the connector to access the cluster. -
saslMechanism: Authentication mechanism for username and password validation. -
securityProtocol: Connection protocol for the connector:PLAINTEXT,SASL_PLAINTEXT: To connect without SSL.SSL,SASL_SSL: To connect with SSL.
-
sslTruststoreCertificates: PEM certificate contents.
-
-
-
topics: Pattern for selecting topics to replicate. List topic names separated by commas or|. You can also use a regular expression (.*), e.g.,analysis.*. To migrate all topics, specify.*. -
replicationFactor: Number of replicas the cluster stores for each topic.
To configure the MirrorMaker connector, use the connector_spec.connector_config_mirrormaker parameter:
-
source_clusterandtarget_cluster: Parameters for connecting to the source and target clusters:-
alias: Cluster prefix in the connector settings.Note
Topics in the target cluster will be created with the specified prefix.
-
this_cluster: Option to use the current cluster as the source or target. -
external_cluster: Parameters for connecting to the external cluster:-
bootstrap_servers: Comma-separated list of the FQDNs of the cluster broker hosts with the port numbers for connection.To learn how to get a broker host FQDN, see this guide.
-
sasl_username: Username for the connector to access the cluster. -
sasl_password: User password for the connector to access the cluster. -
sasl_mechanism: Authentication mechanism for username and password validation. -
security_protocol: Connection protocol for the connector:PLAINTEXT,SASL_PLAINTEXT: To connect without SSL.SSL,SASL_SSL: To connect with SSL.
-
ssl_truststore_certificates: PEM certificate contents.
-
-
-
topics: Pattern for selecting topics to replicate. List topic names separated by commas or|. You can also use a regular expression (.*), e.g.,analysis.*. To migrate all topics, specify.*. -
replication_factor: Number of replicas the cluster stores for each topic, provided as an object with thevaluefield.
S3 Sink
Specify the S3 Sink connector parameters as follows:
-
Topics: Pattern for selecting topics to export. List topic names separated by commas or
|. You can also use a regular expression (.*), e.g.,analysis.*. To migrate all topics, specify.*. -
Compression type: Message compression codec:
You cannot change this setting after the cluster is created.
-
Max record per file: Maximum number of records that can be written to a single file in an S3-compatible storage. This is an optional setting.
-
Under S3 connection, specify the storage connection parameters:
-
Bucket: Storage bucket name.
-
Endpoint: Endpoint for storage access. Get it from your storage provider.
-
Region: Region name. This is an optional setting. The default value is
ru-central1. You can find the list of available regions here .Note
Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why Yandex Object Storage may also accept the main AWS region value, which is the first row in the table of regions
. -
Access key ID, Secret access key: AWS-compatible key ID and contents. This is an optional setting.
-
-
To specify additional settings not listed above, create the relevant keys and set their values under Additional properties when creating or updating the connector. Here are some examples of keys:
key.convertervalue.convertervalue.converter.schemas.enableformat.output.type
For the list of all connector settings, see this connector guide
. For the list of general connector settings, see this Apache Kafka® guide .
-
--cluster-name: Cluster name. -
--tasks-max: Maximum number of concurrently running connector tasks. -
--properties: Comma-separated list of additional connector settings in<key>:<value>format. Here are some examples of keys:key.convertervalue.convertervalue.converter.schemas.enableformat.output.type
For the list of all connector settings, see this connector guide
. For the list of general connector settings, see this Apache Kafka® guide . -
--topics: Pattern for selecting topics to replicate. List topic names separated by commas or|. You can also use a regular expression (.*), e.g.,analysis.*. To migrate all topics, specify.*. -
--file-compression-type: Message compression codec. You cannot change this setting after the cluster is created. Valid values: -
--file-max-records: Maximum number of records that can be written to a single file in an S3-compatible storage. -
--bucket-name: Name of the S3-compatible storage bucket to write data to. -
--storage-endpoint: Endpoint for storage access (get it from your storage provider), e.g.,storage.yandexcloud.net. -
--region: Region where the S3-compatible storage bucket resides. The default value isru-central1. You can find the list of available regions here .Note
Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why Yandex Object Storage may also accept the main AWS region value, which is the first row in the table of regions
. -
--access-key-id,--secret-access-key: AWS-compatible key ID and contents.
-
properties: Comma-separated list of additional connector settings in
<key>:<value>format. Here are some examples of keys:key.convertervalue.convertervalue.converter.schemas.enableformat.output.type
For the list of all connector settings, see this connector guide
- connector_config_s3_sink: S3 Sink connector settings:
-
file_compression_type: Message compression codec. You cannot change this setting after the cluster is created. Valid values:
-
topics: Pattern for selecting topics to replicate. List topic names separated by commas or
|. You can also use a regular expression (.*), e.g.,analysis.*. To migrate all topics, specify.*. -
file_max_records: Maximum number of records that can be written to a single file in an S3-compatible storage.
-
s3_connection: S3-compatible storage connection parameters:
-
bucket_name: Name of the bucket to write data to.
-
external_s3: External S3-compatible storage connection parameters:
-
endpoint: Endpoint for storage access (get it from your storage provider), e.g.,
storage.yandexcloud.net. -
region: Region where the S3-compatible storage bucket resides. The default value is
ru-central1. You can find the list of available regions here .Note
Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why Yandex Object Storage may also accept the main AWS region value, which is the first row in the table of regions
. -
access_key_id, secret_access_key: AWS-compatible key ID and contents.
-
-
-
To configure the S3 Sink connector, use the connectorSpec.connectorConfigS3Sink parameter:
-
topics: Pattern for selecting topics to replicate. List topic names separated by commas or|. You can also use a regular expression (.*), e.g.,analysis.*. To migrate all topics, specify.*. -
fileCompressionType: Message compression codec. You cannot change this setting after the cluster is created. Valid values: -
fileMaxRecords: Maximum number of records that can be written to a single file in an S3-compatible storage. -
s3Connection: S3-compatible storage connection parameters:bucketName: Name of the bucket to write data to.externalS3: External storage parameters:-
endpoint: Endpoint for storage access (get it from your storage provider), e.g.,storage.yandexcloud.net. -
region: Region where the S3-compatible storage bucket resides. The default value isru-central1. You can find the list of available regions here .Note
Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why Yandex Object Storage may also accept the main AWS region value, which is the first row in the table of regions
. -
accessKeyId,secretAccessKey: AWS-compatible key ID and contents.
-
To configure the S3 Sink connector, use the connector_spec.connector_config_s3_sink parameter:
-
topics: Pattern for selecting topics to replicate. List topic names separated by commas or|. You can also use a regular expression (.*), e.g.,analysis.*. To migrate all topics, specify.*. -
file_compression_type: Message compression codec. You cannot change this setting after the cluster is created. Valid values: -
file_max_records: Maximum number of records that can be written to a single file in an S3-compatible storage. provided as an object with thevaluefield. -
s3_connection: S3-compatible storage connection parameters:bucket_name: Name of the bucket to write data to.external_s3: External storage parameters:-
endpoint: Endpoint for storage access (get it from your storage provider), e.g.,storage.yandexcloud.net. -
region: Region where the S3-compatible storage bucket resides. The default value isru-central1. You can find the list of available regions here .Note
Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why Yandex Object Storage may also accept the main AWS region value, which is the first row in the table of regions
. -
access_key_id,secret_access_key: AWS-compatible key ID and contents.
-
Iceberg Sink
Specify the Iceberg Sink connector parameters as follows:
-
Control topic: Select or create a management topic. This topic will be used for coordination and managing the data writing process to Iceberg tables.
-
Topic source: Select the topic source from which the data will be transferred to Iceberg tables:
- Topic list: Comma-separated topic names.
- Topic Regex: A regular expression for selecting topics. You can also use a regular expression (
.*), e.g.,analysis.*. To migrate all topics, specify.*.
-
Table routing: Select a rule for routing each message from an Apache Kafka® topic to Iceberg tables:
-
Static: Destination tables are predetermined. Each topic, along with all its messages, will be routed to a separate Iceberg table.
In the Tables field, list the names of the Iceberg tables separated by commas.
-
Dynamic: The destination table is determined by the content of the message itself.
In the Route field field, specify the field in the message whose value determines the target table.
-
-
Under Metastore connection, specify the Apache Hive™ Metastore connection properties:
- Catalog URI: URI for connection to the Apache Hive™ Metastore cluster in
thrift://<host>:<port>format. - Warehouse: Root directory for storing managed table data in S3 in
s3a://bucket-name/path/to/warehouseformat.
- Catalog URI: URI for connection to the Apache Hive™ Metastore cluster in
-
Under S3 connection, specify the storage connection parameters:
-
Endpoint: Endpoint for storage access. Get it from your storage provider.
-
Region: Region name. This is an optional setting. The default value is
ru-central1. You can find the list of available regions here .Note
Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why Yandex Object Storage may also accept the main AWS region value, which is the first row in the table of regions
. -
Access key ID, Secret access key: AWS-compatible key ID and contents.
-
-
Optionally, under Optional settings:
-
Table settings section:
- The default branch name is Default commit branch. The connector will commit data to this branch of the Iceberg table. The default value is
main. - Default column list: Comma-separated list of default columns that define the ID row in Iceberg tables (primary key). This parameter is required when UPSERT mode is enabled.
- Default partition: Comma-separated list of columns or transformation expressions for partitioning data in the Iceberg table. It defines the physical placement of data to streamline queries, e.g.,
date,year,month,year (timestamp),month (timestamp),days (timestamp), andbucket (16, user_id). - Enable automatic schema evolution: This setting specifies whether the connector should automatically update the Iceberg table schema if the schema of incoming messages from Apache Kafka® changes.
- Force all columns to be nullable: This setting indicates whether to make all fields of the Iceberg table schema
nullable, regardless of how they are defined in the incoming message schema. - Enable case-insensitive field name matching: This setting specifies whether the connector should ignore case when matching the fields of an incoming message to the columns of the Iceberg table.
- The default branch name is Default commit branch. The connector will commit data to this branch of the Iceberg table. The default value is
-
Section Control settings:
- Consumer group prefix: Prefix for the
Consumer Group IDthat the connector uses when reading from Apache Kafka® topics. The default value iscg-control. - Commit interval, ms: Specifies how often the connector commits data to the Iceberg table, milliseconds. The default value is
300000. - Commit timeout, ms: Specifies how long the coordinator waits for confirmation from all workers before considering the commit failed, milliseconds. The default value is
30000. - Commit threads: Number of threads used to commit data to the Iceberg table.
- Transactional prefix: Prefix for the
Transactional IDthat the connector uses when writing to Apache Kafka® within transactions.
- Consumer group prefix: Prefix for the
-
-
--cluster-id: Cluster ID. -
--cluster-name: Cluster name. -
--tasks-max: Maximum number of concurrently running connector tasks. -
--properties: Comma-separated list of additional connector settings in<key>:<value>format. Here are some examples of keys:key.convertervalue.convertervalue.converter.schemas.enable
For the list of general connector settings, see this Apache Kafka® guide
. -
--topics: Comma-separated list of topics whose data will be transferred to Iceberg tables. -
--topics-regex: Regular expression to select topics whose data will be transferred to Iceberg tables. You can also use a regular expression (.*), e.g.,analysis.*. To migrate all topics, specify.*. -
--control-topic: Name of the management topic used for coordinating and managing data writing to Iceberg tables. -
--catalog-uri: URI for connection to the Apache Hive™ Metastore cluster inthrift://<host>:<port>format. -
--warehouse: Root directory for storing managed table data in S3 ins3a://bucket-name/path/to/warehouseformat. -
--access-key-id,--secret-access-key: AWS-compatible key ID and contents. -
--storage-endpoint: Endpoint for storage access (get it from your storage provider), e.g.,storage.yandexcloud.net. -
--region: Region where the S3-compatible storage bucket resides. The default value isru-central1. You can find the list of available regions here .Note
Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why Yandex Object Storage may also accept the main AWS region value, which is the first row in the table of regions
. -
--tables: Comma-separated names of Iceberg tables for static table routing. -
--route-field: Field in the message that determines the target table for dynamic routing. -
The default branch name is
--default-commit-branch. The connector will commit data to this branch of the Iceberg table. The default value ismain. -
--default-id-columns: Comma-separated list of default columns that define the ID row in Iceberg tables (primary key). This parameter is required when UPSERT mode is enabled. -
--default-partition-by: Comma-separated list of columns or transformation expressions for partitioning data in the Iceberg table. It defines the physical placement of data to streamline queries, e.g.,date,year,month,year (timestamp),month (timestamp),days (timestamp), andbucket (16, user_id). -
--evolve-schema-enabled: This setting specifies whether the connector should automatically update the Iceberg table schema if the schema of incoming messages from Apache Kafka® changes. The default value isfalse. -
--schema-force-optional: This setting indicates whether to make all fields of the Iceberg table schemanullable, regardless of how they are defined in the incoming message schema. The default value isfalse. -
--schema-case-insensitive: This setting specifies whether the connector should ignore case when matching the fields of an incoming message to the columns of the Iceberg table. The default value isfalse. -
--group-id-prefix: Prefix for theConsumer Group IDthat the connector uses when reading from Apache Kafka® topics. The default value iscg-control. -
--commit-interval-ms: Specifies how often the connector commits data to the Iceberg table, milliseconds. The default value is300000. -
--commit-timeout-ms: Specifies how long the coordinator waits for confirmation from all workers before considering the commit failed, milliseconds. The default value is30000. -
--commit-threads: Number of threads used to commit data to the Iceberg table. The default value isvCPU × 2. -
--transactional-prefix: Prefix for theTransactional IDthat the connector uses when writing to Apache Kafka® within transactions.
-
properties: Comma-separated list of additional connector settings in
<key>:<value>format. Here are some examples of keys:key.convertervalue.convertervalue.converter.schemas.enable
For the list of general connector settings, see this Apache Kafka® guide
. -
tasks_max: Maximum number of concurrently running connector tasks.
-
connector_config_iceberg_sink: Section with the Iceberg Sink connector configuration:
-
control_topic: Management topic name used for coordinating and managing data writing to Iceberg tables.
-
topics: Comma-separated list of topics whose data will be transferred to Iceberg tables.
-
topics_regex: Regular expression to select topics whose data will be transferred to Iceberg tables. You can also use a regular expression (
.*), e.g.,analysis.*. To migrate all topics, specify.*. -
secrets: Section with additional settings:- commit_interval_ms: Specifies how often the connector commits data to the Iceberg table, milliseconds. The default value is
300000. - commit_threads: Number of threads used to commit data to the Iceberg table. The default value is
vCPU × 2. - commit_timeout_ms: Specifies how long the coordinator waits for confirmation from all workers before considering the commit failed, milliseconds. The default value is
30000. - group_id_prefix: Prefix for the
Consumer Group IDthat the connector uses when reading from Apache Kafka® topics. The default value iscg-control. - transactional_prefix: Prefix for the
Transactional IDthat the connector uses when writing to Apache Kafka® within transactions.
- commit_interval_ms: Specifies how often the connector commits data to the Iceberg table, milliseconds. The default value is
-
dynamic_tables: Section with settings for dynamic table routing:
- route_field: Field in the message that determines the target table for dynamic routing.
-
metastore_connection: Section with Apache Hive™ Metastore connection settings:
- catalog_uri: URI for connection to the Apache Hive™ Metastore cluster in
thrift://<host>:<port>format. - warehouse: Root directory for storing managed table data in S3 in
s3a://bucket-name/path/to/warehouseformat.
- catalog_uri: URI for connection to the Apache Hive™ Metastore cluster in
-
s3_connection: Section with S3-compatible storage connection settings:
- external_s3: Section with S3-compatible storage connection settings:
-
endpoint: Endpoint for storage access (get it from your storage provider). Example:
storage.yandexcloud.net. -
region: Region where the S3-compatible storage bucket resides. The default value is
ru-central1. You can find the list of available regions here .Note
Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why Yandex Object Storage may also accept the main AWS region value, which is the first row in the table of regions
. -
access_key_id, secret_access_key: AWS-compatible key ID and contents.
-
- external_s3: Section with S3-compatible storage connection settings:
-
static_tables: Section with settings for static table routing:
- tables: Comma-separated names of Iceberg tables for static table routing.
-
tables_config: Section with table settings:
- default_commit_branch: Default branch name. The connector will commit data to this branch of the Iceberg table. The default value is
main. - default_id_columns: Comma-separated list of default columns that define the ID row in Iceberg tables (primary key). This parameter is required when UPSERT mode is enabled.
- default_partition_by: Comma-separated list of columns or transformation expressions for partitioning data in the Iceberg table. It defines the physical placement of data to streamline queries, e.g.,
date,year,month,year (timestamp),month (timestamp),days (timestamp), andbucket (16, user_id). - evolve_schema_enabled: This setting specifies whether the connector should automatically update the Iceberg table schema if the schema of incoming messages from Apache Kafka® changes. The default value is
false. - schema_case_insensitive: This setting specifies whether the connector should ignore case when matching the fields of an incoming message to the columns of the Iceberg table. The default value is
false. - schema_force_optional: This setting indicates whether to make all fields of the Iceberg table schema
nullable, regardless of how they are defined in the incoming message schema. The default value isfalse.
- default_commit_branch: Default branch name. The connector will commit data to this branch of the Iceberg table. The default value is
-
The Iceberg Sink connector settings are configured in the connectorSpec.connectorConfigIcebergSink parameter:
topics: Comma-separated list of topics whose data will be transferred to Iceberg tables.topicsRegex: Regular expression to select topics whose data will be transferred to Iceberg tables. You can also use a regular expression (.*), e.g.,analysis.*. To migrate all topics, specify.*.
To select topics, use either the topicsor topicsRegex parameter.
-
controlTopic: Name of the management topic used for coordinating and managing data writing to Iceberg tables. -
metastoreConnection: Apache Hive™ Metastore connection settings:catalogUri: URI for connection to the Apache Hive™ Metastore cluster inthrift://<host>:<port>format.warehouse: Root directory for storing managed table data in S3 ins3a://bucket-name/path/to/warehouseformat.
-
s3Connection: S3-compatible storage connection parameters:externalS3: External storage parameters:-
endpoint: Endpoint for storage access (get it from your storage provider). Example:storage.yandexcloud.net. -
region: Region where the S3-compatible storage bucket resides. The default value isru-central1. You can find the list of available regions here .Note
Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why Yandex Object Storage may also accept the main AWS region value, which is the first row in the table of regions
. -
accessKeyId,secretAccessKey: AWS-compatible key ID and contents.
-
-
staticTables: Section with settings for static table routing:--tables: Comma-separated names of Iceberg tables for static table routing.
-
dynamicTables: Section with settings for dynamic table routing:routeField: Field in the message that determines the target table for dynamic routing.
To set up table routing, use either the staticTables or dynamicTables parameter.
tablesConfig: Section with table settings:- The default branch name is
defaultCommitBranch. The connector will commit data to this branch of the Iceberg table. The default value ismain. defaultIdColumns: Comma-separated list of default columns that define the ID row in Iceberg tables (primary key). This parameter is required when UPSERT mode is enabled.defaultPartitionBy: Comma-separated list of columns or transformation expressions for partitioning data in the Iceberg table. It defines the physical placement of data to streamline queries, e.g.,date,year,month,year (timestamp),month (timestamp),days (timestamp), andbucket (16, user_id).evolveSchemaEnabled: This setting specifies whether the connector should automatically update the Iceberg table schema if the schema of incoming messages from Apache Kafka® changes. The default value isfalse.schemaForceOptional: This setting indicates whether to make all fields of the Iceberg table schemanullable, regardless of how they are defined in the incoming message schema. The default value isfalse.schemaCaseInsensitive: This setting specifies whether the connector should ignore case when matching the fields of an incoming message to the columns of the Iceberg table. The default value isfalse.
- The default branch name is
controlConfig: Section with additional settings:groupIdPrefix: Prefix for theConsumer Group IDthat the connector uses when reading from Apache Kafka® topics. The default value iscg-control.commitIntervalMs: Specifies how often the connector commits data to the Iceberg table, milliseconds. The default value is300000.commitTimeoutMs: Specifies how long the coordinator waits for confirmation from all workers before considering the commit failed, milliseconds. The default value is30000.commitThreads: Number of threads used to commit data to the Iceberg table. The default value isvCPU × 2.transactionalPrefix: Prefix for theTransactional IDthat the connector uses when writing to Apache Kafka® within transactions.
The Iceberg Sink connector settings are configured the connector_spec.connector_config_iceberg_sink parameter:
topics: Comma-separated list of topics whose data will be transferred to Iceberg tables.topics_regex: Regular expression to select topics whose data will be transferred to Iceberg tables. You can also use a regular expression (.*), e.g.,analysis.*. To migrate all topics, specify.*.
To select topics, use either the topics or topics_regex parameter.
-
control_topic: Name of the management topic used for coordinating and managing data writing to Iceberg tables. -
metastore_connection: Apache Hive™ Metastore connection settings:-
catalog_uri: URI for connection to the Apache Hive™ Metastore cluster inthrift://<host>:<port>format. -
warehouse: Root directory for storing managed table data in S3 ins3a://bucket-name/path/to/warehouseformat. -
s3_connection: S3-compatible storage connection parameters:externalS3: External storage parameters:-
endpoint: Endpoint for storage access (get it from your storage provider). Example:storage.yandexcloud.net. -
region: Region where the S3-compatible storage bucket resides. The default value isru-central1. You can find the list of available regions here .Note
Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why Yandex Object Storage may also accept the main AWS region value, which is the first row in the table of regions
. -
access_key_id,secret_access_key: AWS-compatible key ID and contents.
-
-
static_tables: Section with settings for static table routing:--tables: Comma-separated names of Iceberg tables for static table routing.
-
dynamic_tables: Section with settings for dynamic table routing:route_field: Field in the message that determines the target table for dynamic routing.
To set up table routing, use either the
static_tablesordynamic_tablesparameter.tables_config: Section with table settings:- The default branch name is
default_commit_branch. The connector will commit data to this branch of the Iceberg table. The default value ismain. default_id_columns: Comma-separated list of default columns that define the ID row in Iceberg tables (primary key). This parameter is required when UPSERT mode is enabled.default_partition_by: Comma-separated list of columns or transformation expressions for partitioning data in the Iceberg table. It defines the physical placement of data to streamline queries, e.g.,date,year,month,year (timestamp),month (timestamp),days (timestamp), andbucket (16, user_id).evolve_schema_enabled: This setting specifies whether the connector should automatically update the Iceberg table schema if the schema of incoming messages from Apache Kafka® changes. The default value isfalse.schema_force_optional: This setting indicates whether to make all fields of the Iceberg table schemanullable, regardless of how they are defined in the incoming message schema. The default value isfalse.schema_case_insensitive: This setting specifies whether the connector should ignore case when matching the fields of an incoming message to the columns of the Iceberg table. The default value isfalse.
- The default branch name is
control_config: Section with additional settings:group_id_prefix: Prefix for theConsumer Group IDthat the connector uses when reading from Apache Kafka® topics. The default value iscg-control.commit_interval_ms: Specifies how often the connector commits data to the Iceberg table, milliseconds. The default value is300000.commit_timeout_ms: Specifies how long the coordinator waits for confirmation from all workers before considering the commit failed, milliseconds. The default value is30000.commit_threads: Number of threads used to commit data to the Iceberg table. The default value isvCPU × 2.transactional_prefix: Prefix for theTransactional IDthat the connector uses when writing to Apache Kafka® within transactions.
-
Editing a connector
- In the management console
, navigate to the relevant folder. - Navigate to Managed Service for Kafka.
- Select the cluster and open the Connectors tab.
- In the connector row, click
and select Edit connector. - Edit the connector properties as needed.
- Click Save.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder used by default is the one specified when creating the CLI profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also specify a different folder for any command using --folder-name or --folder-id. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.
To edit a MirrorMaker connector:
-
View a description of the CLI command to edit a connector:
yc managed-kafka connector-mirrormaker update --help -
Run this command, e.g., to update the task limit:
yc managed-kafka connector-mirrormaker update <connector_name> \ --cluster-name=<cluster_name> \ --direction=<connector_direction> \ --tasks-max=<new_task_limit>Where
--directionis the connector direction, eitheringressoregres.You can get the connector name with the list of cluster connectors, and the cluster name, with the list of clusters in the folder.
To update an S3 Sink connector:
-
View the description of the CLI command to edit a connector:
yc managed-kafka connector-s3-sink update --help -
Run this command, e.g., to update the task limit:
yc managed-kafka connector-s3-sink update <connector_name> \ --cluster-name=<cluster_name> \ --tasks-max=<new_task_limit>You can get the connector name with the list of cluster connectors, and the cluster name, with the list of clusters in the folder.
To update an Iceberg Sink connector:
-
View the description of the CLI command to edit a connector:
yc managed-kafka connector-iceberg-sink update --help -
Run an operation, e.g., the task limit update operation:
yc managed-kafka connector-iceberg-sink update <connector_name> \ --cluster-name=<cluster_name> \ --tasks-max=<new_task_limit>You can get the connector name with the list of cluster connectors and the cluster name with the list of clusters in the folder.
-
Check the list of MirrorMaker, S3 Sink, and Iceberg Sink connector settings.
-
Open the current Terraform configuration file describing your infrastructure.
For information about creating this file, see Creating a cluster Apache Kafka®.
-
Edit the parameter values in the
yandex_mdb_kafka_connectorresource description:-
For a MirrorMaker connector:
resource "yandex_mdb_kafka_connector" "<connector_name>" { cluster_id = "<cluster_ID>" name = "<connector_name>" tasks_max = <task_limit> properties = { <advanced_properties> } connector_config_mirrormaker { topics = "<topic_pattern>" replication_factor = <replication_factor> source_cluster { alias = "<cluster_prefix>" external_cluster { bootstrap_servers = "<list_of_broker_host_FQDNs>" sasl_username = "<username>" sasl_password = "<user_password>" sasl_mechanism = "<authentication_mechanism>" security_protocol = "<security_protocol>" ssl-truststore-certificates = "<PEM_certificate_contents>" } } target_cluster { alias = "<cluster_prefix>" this_cluster {} } } } -
For an S3 Sink connector:
resource "yandex_mdb_kafka_connector" "<S3_Sink_connector_name>" { cluster_id = "<cluster_ID>" name = "<S3_Sink_connector_name>" tasks_max = <task_limit> properties = { <advanced_properties> } connector_config_s3_sink { topics = "<topic_pattern>" file_max_records = <file_max_records> s3_connection { bucket_name = "<bucket_name>" external_s3 { endpoint = "<S3_compatible_storage_endpoint>" access_key_id = "<AWS_compatible_static_key_ID>" secret_access_key = "<AWS_compatible_static_key_contents>" } } } } -
For an Iceberg Sink connector:
resource "yandex_mdb_kafka_connector" "<connector_name>" { cluster_id = "<cluster_ID>" name = "<connector_name>" tasks_max = <task_limit> properties = { <advanced_properties> } connector_config_iceberg_sink { topics = "<topic_list>" control_topic = "<management_topic_name>" metastore_connection { catalog_uri = "<URI_for_connecting_to_Metastore_cluster>" warehouse = "<root_directory_for_storing_managed_table_data_in_S3>" } s3_connection { external_s3 { endpoint = "<S3_compatible_storage_endpoint>" access_key_id = "<AWS_compatible_static_key_ID>" secret_access_key = "<AWS_compatible_static_key_contents>" region = "<region_name>" } } tables_config { default_commit_branch = "<default_branch_name>" default_id_columns = "<comma_separated_default_column_list>" default_partition_by = "<list_of_columns_or_transformation_expressions>" evolve_schema_enabled = <automatically_update_Iceberg_table_schema> schema_force_optional = <make_Iceberg_table_schema_fields_optional> schema_case_insensitive = <ignore_case_when_matching_fields> } control_config { group_id_prefix = "<prefix_for_Consumer_Group_ID>" commit_interval_ms = <Iceberg_table_data_commit_interval> commit_timeout_ms = <how_long_the_coordinator_waits_for_confirmation> commit_threads = <number_of_threads_for_committing_data_to_Iceberg_table> transactional_prefix = "<prefix_for_Transactional_ID>" } } }
-
-
Make sure the settings are correct.
-
In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
-
Run this command:
terraform validateTerraform will show any errors found in your configuration files.
-
-
Confirm resource changes.
-
Run this command to view the planned changes:
terraform planIf you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply -
Confirm updating the resources.
-
Wait for the operation to complete.
-
-
For more information, see this Terraform provider guide.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Call the Connector.update method, e.g., via the following cURL
request:Warning
The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the
updateMaskparameter as a single comma-separated string.curl \ --request PATCH \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/<connector_name>' \ --data '{ "updateMask": "connectorSpec.tasksMax,connectorSpec.properties,connectorSpec.connectorConfigMirrormaker.<Mirrormaker_1_connector_setting>,...,connectorSpec.connectorConfigMirrormaker.<Mirrormaker_N_connector_setting>,connectorSpec.connectorConfigS3Sink.<S3_Sink_1_connector_setting>,...,connectorSpec.connectorConfigS3Sink.<S3_Sink_N_connector_setting>,connectorSpec.connectorConfigIcebergSink.<IcebergSink_1_connector_setting>,...,connectorSpec.connectorConfigIcebergSink.<IcebergSink_N_connector_setting>", "connectorSpec": { "tasksMax": "<task_limit>" "properties": "<advanced_connector_properties>" "connectorConfigMirrormaker": { <Mirrormaker_connector_settings> }, "connectorConfigS3Sink": { <S3_Sink_connector_settings> }, "connectorConfigIcebergSink": { <IcebergSink_connector_settings> } } }'Where:
-
updateMask: Comma-separated string of connector settings to update.Specify the relevant parameters:
connectorSpec.tasksMax: To change the connector task limit.connectorSpec.properties: To change the connector’s advanced properties.connectorSpec.connectorConfigMirrormaker.<configuring_Mirrormaker_connector>: To update the Mirrormaker connector settings.connectorSpec.connectorConfigS3Sink.<configuring_S3_Sink_connector>: To update the S3 Sink connector settings.connectorSpec.connectorConfigIcebergSink.<IcebergSink_connector_configuration_setup>: To update the Iceberg Sink connector settings.
-
connectorSpec: Specify the MirrorMaker, S3 Sink, or Iceberg Sink connector settings.
You can get the cluster ID with the list of clusters in the folder, and the connector name, with the list of connectors in the cluster.
-
-
Check the server response to make sure your request was successful.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume that the repository contents reside in the
~/cloudapi/directory. -
Call the ConnectorService/Update method, e.g., via the following gRPCurl
request:Warning
The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the
update_maskparameter as an array ofpaths[]strings.Format for listing settings
"update_mask": { "paths": [ "<setting_1>", "<setting_2>", ... "<setting_N>" ] }grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "connector_name": "<connector_name>", "update_mask": { "paths": [ "connector_spec.tasks_max", "connector_spec.properties", "connector_spec.connector_config_mirrormaker.<Mirrormaker_1_connector_setting>", ..., "connector_spec.connector_config_mirrormaker.<Mirrormaker_N_connector_setting>", "connector_spec.connector_config_s3_sink.<S3_Sink_1_connector_setting>", ..., "connector_spec.connector_config_s3_sink.<S3-Sink_N_connector_setting>", "connector_spec.connector_config_iceberg_sink.<IcebergSink_1_setting>", ..., "connector_spec.connector_config_iceberg_sink.<IcebergSink_N_connector_setting>" ] }, "connector_spec": { "tasks_max": { "value": "<task_limit>" }, "properties": "<advanced_connector_properties>" "connector_config_mirrormaker": { <Mirrormaker_connector_settings> }, "connector_config_s3_sink": { <S3_Sink_connector_settings> }, "connector_config_iceberg_sink": { <IcebergSink_connector_settings> } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.UpdateWhere:
-
update_mask: List of connector settings you want to update as an array of strings (paths[]).Specify the relevant parameters:
connector_spec.tasks_max: To change the connector task limit.connector_spec.properties: To change the connector’s advanced properties.connector_spec.connector_config_mirrormaker.<configuring_Mirrormaker_connector>: To update the Mirrormaker connector settings.connector_spec.connector_config_s3_sink.<configuring_S3_Sink_connector>: To update the S3 Sink connector settings.connector_spec.connector_config_iceberg_sink.<IcebergSink_connector_configuration_setup>: To update the Iceberg Sink connector settings.
-
connector_spec: Specify the MirrorMaker or S3 Sink connector settings.
You can get the cluster ID with the list of clusters in the folder, and the connector name, with the list of connectors in the cluster.
-
-
Check the server response to make sure your request was successful.
Pausing a connector
When you pause a connector, the system:
- Terminates the sink connection.
- Deletes data from the connector service topics.
To pause a connector:
- In the management console
, navigate to the relevant folder. - Navigate to Managed Service for Kafka.
- Select the cluster and open the Connectors tab.
- Click
next to the connector name and select Pause.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder used by default is the one specified when creating the CLI profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also specify a different folder for any command using --folder-name or --folder-id. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.
To pause a connector, run this command:
yc managed-kafka connector pause <connector_name> \
--cluster-name=<cluster_name>
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Call the Connector.pause method, e.g., via the following cURL
request:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/pause/<connector_name>'You can get the cluster ID with the list of clusters in the folder, and the connector name, with the list of connectors in the cluster.
-
Check the server response to make sure your request was successful.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume that the repository contents reside in the
~/cloudapi/directory. -
Call the ConnectorService/Pause method, e.g., via the following gRPCurl
request:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "connector_name": "<connector_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.PauseYou can get the cluster ID with the list of clusters in the folder, and the connector name, with the list of connectors in the cluster.
-
Check the server response to make sure your request was successful.
Resuming a connector
- In the management console
, navigate to the relevant folder. - Navigate to Managed Service for Kafka.
- Select the cluster and open the Connectors tab.
- Click
next to the connector name and select Resume.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder used by default is the one specified when creating the CLI profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also specify a different folder for any command using --folder-name or --folder-id. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.
To resume a connector, run this command:
yc managed-kafka connector resume <connector_name> \
--cluster-name=<cluster_name>
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Call the Connector.pause method, e.g., via the following cURL
request:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/resume/<connector_name>'You can get the cluster ID with the list of clusters in the folder, and the connector name, with the list of connectors in the cluster.
-
Check the server response to make sure your request was successful.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume that the repository contents reside in the
~/cloudapi/directory. -
Call the ConnectorService/Resume method, e.g., via the following gRPCurl
request:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "connector_name": "<connector_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.ResumeYou can get the cluster ID with the list of clusters in the folder, and the connector name, with the list of connectors in the cluster.
-
Check the server response to make sure your request was successful.
Importing a connector to Terraform
You can import the existing connectors to manage them with Terraform.
-
In the Terraform configuration file, specify the connector you want to import:
resource "yandex_mdb_kafka_cluster" "<connector_name>" {} -
Run the following command to import your connector:
terraform import yandex_mdb_kafka_connector.<connector_name> <cluster_ID>:<connector_name>To learn more about importing connectors, see this Terraform provider guide.
Deleting a connector
- In the management console
, navigate to the relevant folder. - Navigate to Managed Service for Kafka.
- Select the cluster and open the Connectors tab.
- Click
next to the connector name and select Delete. - Click Delete.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder used by default is the one specified when creating the CLI profile. To change the default folder, use the yc config set folder-id <folder_ID> command. You can also specify a different folder for any command using --folder-name or --folder-id. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.
To delete a connector, run this command:
yc managed-kafka connector delete <connector_name> \
--cluster-name <cluster_name>
-
Open the current Terraform configuration file describing your infrastructure.
For information about creating this file, see Creating a cluster Apache Kafka®.
-
Delete the
yandex_mdb_kafka_connectorresource with the connector description. -
Make sure the settings are correct.
-
In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
-
Run this command:
terraform validateTerraform will show any errors found in your configuration files.
-
-
Confirm resource changes.
-
Run this command to view the planned changes:
terraform planIf you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply -
Confirm updating the resources.
-
Wait for the operation to complete.
-
-
For more information, see this Terraform provider guide.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Call the Connector.pause method, e.g., via the following cURL
request:curl \ --request DELETE \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/<connector_name>'You can get the cluster ID with the list of clusters in the folder, and the connector name, with the list of connectors in the cluster.
-
Check the server response to make sure your request was successful.
-
Get an IAM token for API authentication and put it into an environment variable:
export IAM_TOKEN="<IAM_token>" -
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapiBelow, we assume that the repository contents reside in the
~/cloudapi/directory. -
Call the ConnectorService/Delete method, e.g., via the following gRPCurl
request:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "connector_name": "<connector_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.DeleteYou can get the cluster ID with the list of clusters in the folder, and the connector name, with the list of connectors in the cluster.
-
Check the server response to make sure your request was successful.