Managing connectors
Connectors manage the transfer of Apache Kafka® topics to other clusters or data storage systems.
You can:
- Get a list of connectors.
- Get detailed information about a connector.
- Create a connector of the right type:
- Edit a connector.
- Pause a connector.
- Resume a connector.
- Import a connector to Terraform.
- Delete a connector.
Get a list of connectors
- In the management console
, go to the relevant folder. - In the services list, select Managed Service for Kafka.
- Select a cluster and open the Connectors tab.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder through the --folder-name
or --folder-id
parameter.
To get a list of cluster connectors, run the command:
yc managed-kafka connector list --cluster-name=<cluster_name>
Result:
+--------------+-----------+
| NAME | TASKS MAX |
+--------------+-----------+
| connector559 | 1 |
| ... | |
+--------------+-----------+
You can retrieve the cluster name with a list of clusters in the folder.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Connector.list method and send the following request, e.g., via cURL
:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors'
You can get the cluster ID with a list of clusters in the folder.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the ConnectorService/List call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.List
You can get the cluster ID with a list of clusters in the folder.
-
View the server response to make sure the request was successful.
Getting detailed information about a connector
- In the management console
, go to the relevant folder. - In the services list, select Managed Service for Kafka.
- Select a cluster and open the Connectors tab.
- Click the name of the connector you need.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder through the --folder-name
or --folder-id
parameter.
To get detailed information about a connector, run this command:
yc managed-kafka connector get <connector_name>\
--cluster-name=<cluster_name>
Result:
name: connector785
tasks_max: "1"
cluster_id: c9qbkmoiimsl********
...
You can request the connector name with a list of cluster connectors and the cluster name with a list of clusters in the folder.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Connector.get method and send the following request, e.g., via cURL
:curl \ --request GET \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/<connector_name>'
You can request the cluster ID with a list of clusters in the folder and the connector name with a list of connectors in the cluster.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the ConnectorService/Get call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.Get
You can get the cluster ID with a list of clusters in the folder.
-
View the server response to make sure the request was successful.
Creating a connector
-
In the management console
, go to the relevant folder. -
In the services list, select Managed Service for Kafka.
-
Select a cluster and open the Connectors tab.
-
Click Create connector.
-
Under Basic parameters, specify:
- Connector name.
- Task limit: Number of concurrent processes. To distribute replication load evenly, we recommend a value of at least
2
.
-
Under Additional properties, specify the connector properties in the following format:
<key>:<value>
The key can either be a simple string or contain a prefix indicating that it belongs to the source or target (a cluster alias in the connector configuration):
<cluster_alias>.<key_body>:<value>
-
Select the connector type: MirrorMaker or S3 Sink, and set up its configuration.
For more information about the supported connector types, see Connectors.
-
Click Create.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder through the --folder-name
or --folder-id
parameter.
To create a MirrorMaker connector:
-
View the description of the CLI command to create a connector:
yc managed-kafka connector-mirrormaker create --help
-
Create a connector:
yc managed-kafka connector-mirrormaker create <connector_name> \ --cluster-name=<cluster_name> \ --direction=<connector_direction> \ --tasks-max=<task_limit> \ --properties=<advanced_properties> \ --replication-factor=<replication_factor> \ --topics=<topics_template> \ --this-cluster-alias=<this_cluster_prefix> \ --external-cluster alias=<external_cluster_prefix>,` `bootstrap-servers=<list_of_broker_host_FQDNs>,` `security-protocol=<security_protocol>,` `sasl-mechanism=<encryption_mechanism>,` `sasl-username=<username>,` `sasl-password=<user_password>,` `ssl-truststore-certificates=<certificates_in_PEM_format>
For info on how to get a broker host's FQDN, see this guide.
You can retrieve the cluster name with a list of clusters in the folder.
--direction
takes these values:egress
: If the current cluster is a source cluster.ingress
: If the current cluster is a target cluster.
To create an S3 Sink connector:
-
View the description of the CLI command to create a connector:
yc managed-kafka connector-s3-sink create --help
-
Create a connector:
yc managed-kafka connector-s3-sink create <connector_name> \ --cluster-name=<cluster_name> \ --tasks-max=<task_limit> \ --properties=<advanced_properties> \ --topics=<topics_template> \ --file-compression-type=<compression_codec> \ --file-max-records=<file_max_records> \ --bucket-name=<bucket_name> \ --access-key-id=<AWS_compatible_static_key_ID> \ --secret-access-key=<AWS_compatible_static_key_contents> \ --storage-endpoint=<S3_compatible_storage_endpoint> \ --region=<S3_compatible_storage_region>
You can retrieve the cluster name with a list of clusters in the folder.
-
Check the list of MirrorMaker and S3 Sink connector settings.
-
Open the current Terraform configuration file with an infrastructure plan.
For more information about creating this file, see Creating clusters.
-
To create a MirrorMaker connector, add the
yandex_mdb_kafka_connector
resource with theconnector_config_mirrormaker
settings section:resource "yandex_mdb_kafka_connector" "<connector_name>" { cluster_id = "<cluster_ID>" name = "<connector_name>" tasks_max = <task_limit> properties = { <advanced_properties> } connector_config_mirrormaker { topics = "<topics_template>" replication_factor = <replication_factor> source_cluster { alias = "<cluster_prefix>" external_cluster { bootstrap_servers = "<list_of_broker_host_FQDNs>" sasl_username = "<username>" sasl_password = "<user_password>" sasl_mechanism = "<encryption_mechanism>" security_protocol = "<security_protocol>" ssl-truststore-certificates = "<PEM_certificate_contents>" } } target_cluster { alias = "<cluster_prefix>" this_cluster {} } } }
For info on how to get a broker host's FQDN, see this guide.
-
To create an S3 Sink connector, add the
yandex_mdb_kafka_connector
resource with theconnector_config_s3_sink
settings section:resource "yandex_mdb_kafka_connector" "<connector_name>" { cluster_id = "<cluster_ID>" name = "<connector_name>" tasks_max = <task_limit> properties = { <advanced_properties> } connector_config_s3_sink { topics = "<topics_template>" file_compression_type = "<compression_codec>" file_max_records = <file_max_records> s3_connection { bucket_name = "<bucket_name>" external_s3 { endpoint = "<S3_compatible_storage_endpoint>" access_key_id = "<AWS_compatible_static_key_ID>" secret_access_key = "<AWS_compatible_static_key_contents>" } } } }
-
Make sure the settings are correct.
-
Using the command line, navigate to the folder that contains the up-to-date Terraform configuration files with an infrastructure plan.
-
Run the command:
terraform validate
If there are errors in the configuration files, Terraform will point to them.
-
-
Confirm updating the resources.
-
Run the command to view planned changes:
terraform plan
If the resource configuration descriptions are correct, the terminal will display a list of the resources to modify and their parameters. This is a test step. No resources are updated.
-
If you are happy with the planned changes, apply them:
-
Run the command:
terraform apply
-
Confirm the update of resources.
-
Wait for the operation to complete.
-
-
For more information, see the Terraform
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
To create a MirrorMaker connector, use the Connector.create method and send the following request, e.g., via cURL
:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors' \ --data '{ "connectorSpec": { "name": "<connector_name>", "tasksMax": "<task_limit>" "properties": "<advanced_connector_properties>" "connectorConfigMirrormaker": { <Mirrormaker_connector_settings> } } }'
You can get the cluster ID with a list of clusters in the folder.
-
To create an S3 Sink connector, use the Connector.create method and send the following request, e.g., via cURL
:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors' \ --data '{ "connectorSpec": { "name": "<connector_name>", "tasksMax": "<task_limit>" "properties": "<advanced_connector_properties>" "connectorConfigS3Sink": { <S3_Sink_connector_settings> } } }'
You can get the cluster ID with a list of clusters in the folder.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
To create a MirrorMaker connector, use the ConnectorService/Create call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "connector_spec": { "name": "<connector_name>", "tasks_max": { "value": "<task_limit>" }, "properties": "<advanced_connector_properties>" "connector_config_mirrormaker": { <Mirrormaker_connector_settings> } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.Create
You can get the cluster ID with a list of clusters in the folder.
-
To create an S3 Sink connector, use the ConnectorService/Create call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "connector_spec": { "name": "<connector_name>", "tasks_max": { "value": "<task_limit>" }, "properties": "<advanced_connector_properties>" "connector_config_s3_sink": { <S3_Sink_connector_settings> } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.Create
You can get the cluster ID with a list of clusters in the folder.
-
View the server response to make sure the request was successful.
MirrorMaker
Specify the MirrorMaker connector parameters:
-
Topics: Template for selecting topics to replicate. Topic names in the list are separated by a comma or
|
. You may use the.*
expression, e.g.,analysis.*
. To migrate all topics, put.*
. -
Replication factor: Number of topic copies stored in the cluster.
-
Under Source cluster, specify the parameters for connecting to the source cluster:
-
Alias: Source cluster prefix in the connector settings.
Note
Topics in the target cluster will be created with the specified prefix.
-
Use this cluster: Select this option to use the current cluster as a source.
-
Bootstrap servers: Сomma-separated list of the FQDNs of the source cluster broker hosts with the port numbers for connection, e.g.,
broker1.example.com:9091,broker2.example.com
.For info on how to get a broker host's FQDN, see this guide.
-
SASL username: Username for connecting the connector to the source cluster.
-
SASL password: User password for connecting the connector to the source cluster.
-
SASL mechanism: Select a username and password encryption mechanism.
-
Security protocol: Select a connector connection protocol:
PLAINTEXT
,SASL_PLAINTEXT
: To connect without SSL.SSL
,SASL_SSL
: To connect with SSL.
-
Certificate in PEM format: Upload a PEM certificate to access the external cluster.
-
-
Under Target cluster, specify the parameters for connecting to the target cluster:
-
Alias: Target cluster prefix in the connector settings.
-
Use this cluster: Select this option to use the current cluster as a target.
-
Bootstrap servers: Сomma-separated list of the FQDNs of the target cluster broker hosts with the port numbers for connection.
For info on how to get a broker host's FQDN, see this guide.
-
SASL username: Username for connecting the connector to the target cluster.
-
SASL password: User password for connecting the connector to the target cluster.
-
SASL mechanism: Select a username and password encryption mechanism.
-
Security protocol: Select a connector connection protocol:
PLAINTEXT
,SASL_PLAINTEXT
: To connect without SSL.SSL
,SASL_SSL
: To connect with SSL.
-
Certificate in PEM format: Upload a PEM certificate to access the external cluster.
-
-
To specify additional setting values not listed above, create the relevant keys and specify their values under Additional properties when creating or editing a connector. Here are some sample keys:
key.converter
value.converter
For the list of common connector settings, see the Apache Kafka®
documentation.
-
--cluster-name
: Cluster name. -
--direction
: Connector direction:ingress
: For a target cluster.egress
: For a source cluster.
-
--tasks-max
: Number of concurrent processes. To distribute replication load evenly, we recommend a value of at least2
. -
--properties
: Comma-separated list of advanced connector settings in<key>:<value>
format. Here are some sample keys:key.converter
value.converter
For the list of common connector settings, see the Apache Kafka®
documentation. -
--replication-factor
: Number of topic copies stored in the cluster. -
--topics
: Template for selecting topics to replicate. Topic names in the list are separated by a comma or|
. You may use the.*
expression, e.g.,analysis.*
. To migrate all topics, put.*
. -
--this-cluster-alias
: This cluster prefix in the connector settings. -
--external-cluster
: External cluster parameters:-
alias
: External cluster prefix in the connector settings. -
bootstrap-servers
: Comma-separated list of the FQDNs of the external cluster broker hosts with the port numbers for connection.For info on how to get a broker host's FQDN, see this guide.
-
security-protocol
: Connector connection protocol:plaintext
,sasl_plaintext
: To connect without SSL.ssl
,sasl_ssl
: To connect with SSL.
-
sasl-mechanism
: Username and password encryption mechanism. -
sasl-username
: Username for connecting the connector to the external cluster. -
sasl-password
: User password for connecting the connector to the external cluster. -
ssl-truststore-certificates
: List of PEM certificates.
-
-
properties: Comma-separated list of advanced connector settings in
<key>:<value>
format. Here are some sample keys:key.converter
value.converter
For the list of common connector settings, see the Apache Kafka® documentation
. -
topics: Template for selecting topics to replicate. Topic names in the list are separated by a comma or
|
. You may use the.*
expression, e.g.,analysis.*
. To migrate all topics, put.*
. -
replication_factor: Number of topic copies stored in the cluster.
-
source_cluster and target_cluster: Parameters for connecting to the source cluster and target cluster:
-
alias: Cluster prefix in the connector settings.
Note
Topics in the target cluster will be created with the specified prefix.
-
this_cluster: Option to use the current cluster as a source or target.
-
external_cluster: Parameters for connecting to the external cluster:
-
bootstrap_servers: Comma-separated list of the FQDNs of the cluster broker hosts with the port numbers for connection.
For info on how to get a broker host's FQDN, see this guide.
-
sasl_username: Username for connecting the connector to the cluster.
-
sasl_password: User password for connecting the connector to the cluster.
-
sasl_mechanism: Username and password encryption mechanism.
-
security_protocol: Connector connection protocol:
PLAINTEXT
,SASL_PLAINTEXT
: To connect without SSL.SSL
,SASL_SSL
: To connect with SSL.
-
ssl_truststore_certificates: PEM certificate contents.
-
-
MirrorMaker connector settings are specified in the connectorSpec.connectorConfigMirrormaker
parameter:
-
sourceCluster
andtargetCluster
: Parameters for connecting to the source cluster and target cluster:-
alias
: Cluster prefix in the connector settings.Note
Topics in the target cluster will be created with the specified prefix.
-
thisCluster
: Option to use the current cluster as a source or target. -
externalCluster
: Parameters for connecting to the external cluster:-
bootstrapServers
: Comma-separated list of the FQDNs of the cluster broker hosts with the port numbers for connection.For info on how to get a broker host's FQDN, see this guide.
-
saslUsername
: Username for connecting the connector to the cluster. -
saslPassword
: User password for connecting the connector to the cluster. -
saslMechanism
: Username and password encryption mechanism. -
securityProtocol
: Connector connection protocol:PLAINTEXT
,SASL_PLAINTEXT
: To connect without SSL.SSL
,SASL_SSL
: To connect with SSL.
-
sslTruststoreCertificates
: PEM certificate contents.
-
-
-
topics
: Template for selecting topics to replicate. Topic names in the list are separated by a comma or|
. You may use the.*
expression, e.g.,analysis.*
. To migrate all topics, put.*
. -
replicationFactor
: Number of topic copies stored in the cluster.
MirrorMaker connector settings are specified in the connector_spec.connector_config_mirrormaker
parameter:
-
source_cluster
andtarget_cluster
: Parameters for connecting to the source cluster and target cluster:-
alias
: Cluster prefix in the connector settings.Note
Topics in the target cluster will be created with the specified prefix.
-
this_cluster
: Option to use the current cluster as a source or target. -
external_cluster
: Parameters for connecting to the external cluster:-
bootstrap_servers
: Comma-separated list of the FQDNs of the cluster broker hosts with the port numbers for connection.For info on how to get a broker host's FQDN, see this guide.
-
sasl_username
: Username for connecting the connector to the cluster. -
sasl_password
: User password for connecting the connector to the cluster. -
sasl_mechanism
: Username and password encryption mechanism. -
security_protocol
: Connector connection protocol:PLAINTEXT
,SASL_PLAINTEXT
: To connect without SSL.SSL
,SASL_SSL
: To connect with SSL.
-
ssl_truststore_certificates
: PEM certificate contents.
-
-
-
topics
: Template for selecting topics to replicate. Topic names in the list are separated by a comma or|
. You may use the.*
expression, e.g.,analysis.*
. To migrate all topics, put.*
. -
replication_factor
: Number of topic copies stored in the cluster. Provided as an object with thevalue
field:
S3 Sink
Specify the S3 Sink connector parameters:
-
Topics: Template for selecting topics to replicate. Topic names in the list are separated by a comma or
|
. You may use the.*
expression, e.g.,analysis.*
. To migrate all topics, put.*
. -
Compression type: Select the codec to compress messages:
You cannot change this parameter after creating the cluster.
-
(Optional) Max record per file: Maximum number of records that can be written to a single file in an S3-compatible storage.
-
Under S3 connection, specify the storage connection parameters:
-
Bucket: Storage bucket name
-
Endpoint: Endpoint for storage access (to be requested from the storage provider)
-
(Optional) Region: Region name. The default value is
us-east-1
. Available regions . -
(Optional) Access key ID, Secret access key: AWS-compatible key ID and contents.
-
-
To specify additional setting values not listed above, create the relevant keys and specify their values under Additional properties when creating or editing a connector. Here are some sample keys:
key.converter
value.converter
value.converter.schemas.enable
format.output.type
For the list of all connector settings, see the connector documentation
. For the list of common connector settings, see the Apache Kafka® documentation.
-
--cluster-name
: Cluster name. -
--tasks-max
: Number of concurrent processes. To distribute replication load evenly, we recommend a value of at least2
. -
--properties
: Comma-separated list of advanced connector settings in<key>:<value>
format. Here are some sample keys:key.converter
value.converter
value.converter.schemas.enable
format.output.type
For the list of all connector settings, see the connector documentation
. For the list of common connector settings, see the Apache Kafka® documentation. -
--topics
: Template for selecting topics to replicate. Topic names in the list are separated by a comma or|
. You may use the.*
expression, e.g.,analysis.*
. To migrate all topics, put.*
. -
--file-compression-type
: Message compression codec. You cannot change this parameter after creating the cluster. Acceptable values: -
--file-max-records
: Maximum number of records that can be written to a single file in an S3-compatible storage. -
--bucket-name
: Name of the S3-compatible storage bucket to write data to. -
--storage-endpoint
: Endpoint for storage access (to be requested from the storage provider), e.g.,storage.yandexcloud.net
. -
--region
: Region where the S3-compatible storage bucket is located. The default value isus-east-1
. Available regions . -
--access-key-id
,--secret-access-key
: AWS-compatible key ID and contents.
-
properties: Comma-separated list of advanced connector settings in
<key>:<value>
format. Here are some sample keys:key.converter
value.converter
value.converter.schemas.enable
format.output.type
For the list of all connector settings, see the connector documentation
. For the list of common connector settings, see the Apache Kafka® documentation. -
topics: Template for selecting topics to replicate. Topic names in the list are separated by a comma or
|
. You may use the.*
expression, e.g.,analysis.*
. To migrate all topics, put.*
. -
file_compression_type: Codec for message compression. You cannot change this parameter after creating the cluster. Acceptable values:
-
file_max_records: Maximum number of records that can be written to a single file in an S3-compatible storage.
-
s3_connection: S3-compatible storage connection parameters:
-
bucket_name: Name of the bucket to write data to.
-
external_s3: External S3-compatible storage connection parameters:
-
endpoint: Endpoint for storage access (to find out from storage provider), e.g.,
storage.yandexcloud.net
. -
region: Region where the S3-compatible storage bucket is located. The default value is
us-east-1
. Available regions . -
access_key_id, secret_access_key: AWS-compatible key ID and contents.
-
-
The S3 Sink connector settings are specified in the connectorSpec.connectorConfigS3Sink
parameter:
-
topics
: Template for selecting topics to replicate. Topic names in the list are separated by a comma or|
. You may use the.*
expression, e.g.,analysis.*
. To migrate all topics, put.*
. -
fileCompressionType
: Message compression codec. You cannot change this parameter after creating the cluster. Acceptable values: -
fileMaxRecords
: Maximum number of records that can be written to a single file in an S3-compatible storage. -
s3Connection
: S3-compatible storage connection parameters:bucketName
: Name of the bucket to write data to.externalS3
: External storage parameters:-
endpoint
: Endpoint for storage access (to be requested from the storage provider), e.g.,storage.yandexcloud.net
. -
region
: Region where the S3-compatible storage bucket is located. The default value isus-east-1
. Available regions . -
accessKeyId
,secretAccessKey
: AWS-compatible key ID and contents.
-
The S3 Sink connector settings are specified in the connector_spec.connector_config_s3_sink
parameter:
-
topics
: Template for selecting topics to replicate. Topic names in the list are separated by a comma or|
. You may use the.*
expression, e.g.,analysis.*
. To migrate all topics, put.*
. -
file_compression_type
: Message compression codec. You cannot change this parameter after creating the cluster. Acceptable values: -
file_max_records
: Maximum number of records that can be written to a single file in an S3-compatible storage. Provided as an object with thevalue
field: -
s3_connection
: S3-compatible storage connection parameters:bucket_name
: Name of the bucket to write data to.external_s3
: External storage parameters:-
endpoint
: Endpoint for storage access (to be requested from the storage provider), e.g.,storage.yandexcloud.net
. -
region
: Region where the S3-compatible storage bucket is located. The default value isus-east-1
. Available regions . -
access_key_id
,secret_access_key
: AWS-compatible key ID and contents.
-
Editing a connector
- In the management console
, go to the relevant folder. - In the services list, select Managed Service for Kafka.
- Select a cluster and open the Connectors tab.
- In the line with the required connector, click
and select Edit connector. - Edit the connector properties as needed.
- Click Save.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder through the --folder-name
or --folder-id
parameter.
To edit a MirrorMaker connector:
-
View the description of the CLI command to edit a connector:
yc managed-kafka connector-mirrormaker update --help
-
Run an operation, e.g., the task limit update operation:
yc managed-kafka connector-mirrormaker update <connector_name> \ --cluster-name=<cluster_name> \ --direction=<connector_direction> \ --tasks-max=<new_task_limit>
Where
--direction
is the connector direction:ingress
oregres
.You can request the connector name with a list of cluster connectors and the cluster name with a list of clusters in the folder.
To update the S3 Sink connector:
-
View the description of the CLI command to edit a connector:
yc managed-kafka connector-s3-sink update --help
-
Run an operation, e.g., the task limit update operation:
yc managed-kafka connector-s3-sink update <connector_name> \ --cluster-name=<cluster_name> \ --tasks-max=<new_task_limit>
You can request the connector name with a list of cluster connectors and the cluster name with a list of clusters in the folder.
-
Check the list of MirrorMaker and S3 Sink connector settings.
-
Open the current Terraform configuration file with an infrastructure plan.
For more information about creating this file, see Creating clusters.
-
Edit the parameter values in the
yandex_mdb_kafka_connector
resource description:-
For a MirrorMaker connector:
resource "yandex_mdb_kafka_connector" "<connector_name>" { cluster_id = "<cluster_ID>" name = "<connector_name>" tasks_max = <task_limit> properties = { <advanced_properties> } connector_config_mirrormaker { topics = "<topics_template>" replication_factor = <replication_factor> source_cluster { alias = "<cluster_prefix>" external_cluster { bootstrap_servers = "<list_of_broker_host_FQDNs>" sasl_username = "<username>" sasl_password = "<user_password>" sasl_mechanism = "<encryption_mechanism>" security_protocol = "<security_protocol>" ssl-truststore-certificates = "<PEM_certificate_contents>" } } target_cluster { alias = "<cluster_prefix>" this_cluster {} } } }
-
For the S3 Sink connector:
resource "yandex_mdb_kafka_connector" "<S3_Sink_connector_name>" { cluster_id = "<cluster_ID>" name = "<S3_Sink_connector_name>" tasks_max = <task_limit> properties = { <advanced_properties> } connector_config_s3_sink { topics = "<topics_template>" file_max_records = <file_max_records> s3_connection { bucket_name = "<bucket_name>" external_s3 { endpoint = "<S3_compatible_storage_endpoint>" access_key_id = "<AWS_compatible_static_key_ID>" secret_access_key = "<AWS_compatible_static_key_contents>" } } } }
-
-
Make sure the settings are correct.
-
Using the command line, navigate to the folder that contains the up-to-date Terraform configuration files with an infrastructure plan.
-
Run the command:
terraform validate
If there are errors in the configuration files, Terraform will point to them.
-
-
Confirm updating the resources.
-
Run the command to view planned changes:
terraform plan
If the resource configuration descriptions are correct, the terminal will display a list of the resources to modify and their parameters. This is a test step. No resources are updated.
-
If you are happy with the planned changes, apply them:
-
Run the command:
terraform apply
-
Confirm the update of resources.
-
Wait for the operation to complete.
-
-
For more information, see the Terraform
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Connector.update method and send the following request, e.g., via cURL
:Warning
The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the
updateMask
parameter as a single comma-separated string.curl \ --request PATCH \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/<connector_name>' \ --data '{ "updateMask": "connectorSpec.tasksMax,connectorSpec.properties,connectorSpec.connectorConfigMirrormaker.<Mirrormaker_1_connector_setting>,...,connectorSpec.connectorConfigMirrormaker.<Mirrormaker_N_connector_setting>,connectorSpec.connectorConfigS3Sink.<S3_Sink_1_connector_setting>,...,connectorSpec.connectorConfigS3Sink.<S3_Sink_N_connector_setting>", "connectorSpec": { "tasksMax": "<task_limit>" "properties": "<advanced_connector_properties>" "connectorConfigMirrormaker": { <Mirrormaker_connector_settings> }, "connectorConfigS3Sink": { <S3_Sink_connector_settings> } } }'
Where:
-
updateMask
: Comma-separated list of connector parameters to update presented as a single string.Specify the relevant parameters:
connectorSpec.tasksMax
: To change the connector's task limit.connectorSpec.properties
: To change the connector's advanced properties.connectorSpec.connectorConfigMirrormaker.<configuring_Mirrormaker_connector>
: To update Mirrormaker connector settings.connectorSpec.connectorConfigS3Sink.<configuring_S3_Sink_connector>
: To update S3 Sink connector settings.
-
connectorSpec
: Specify MirrorMaker or S3 Sink connector settings.
You can request the cluster ID with a list of clusters in the folder and the connector name with a list of connectors in the cluster.
-
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the ConnectorService/Update call and send the following request, e.g., via gRPCurl
:Warning
The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the
update_mask
parameter as an array ofpaths[]
strings.Format for listing settings
"update_mask": { "paths": [ "<setting_1>", "<setting_2>", ... "<setting_N>" ] }
grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "connector_name": "<connector_name>", "update_mask": { "paths": [ "connector_spec.tasks_max", "connector_spec.properties", "connector_spec.connector_config_mirrormaker.<Mirrormaker_1_connector_setting>", ..., "connector_spec.connector_config_mirrormaker.<Mirrormaker_N_connector_setting>", "connector_spec.connector_config_s3_sink.<S3_Sink_1_connector_setting>", ..., "connector_spec.connector_config_s3_sink.<S3-Sink_N_connector_setting>" ] }, "connector_spec": { "tasks_max": { "value": "<task_limit>" }, "properties": "<advanced_connector_properties>" "connector_config_mirrormaker": { <Mirrormaker_connector_settings> }, "connector_config_s3_sink": { <S3_Sink_connector_settings> } } }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.Update
Where:
-
update_mask
: List of connector parameters to update as an array ofpaths[]
strings.Specify the relevant parameters:
connector_spec.tasks_max
: To change the connector's task limit.connector_spec.properties
: To change the connector's advanced properties.connector_spec.connector_config_mirrormaker.<configuring_Mirrormaker_connector>
: To update Mirrormaker connector settings.connector_spec.connector_config_s3_sink.<configuring_S3_Sink_connector>
: To update S3 Sink connector settings.
-
connector_spec
: Specify MirrorMaker or S3 Sink connector settings.
You can request the cluster ID with a list of clusters in the folder and the connector name with a list of connectors in the cluster.
-
-
View the server response to make sure the request was successful.
Pausing a connector
When you pause a connector:
- The connection to the target is broken.
- Data is deleted from the connector service topics.
To pause a connector:
- In the management console
, go to the relevant folder. - In the services list, select Managed Service for Kafka.
- Select a cluster and open the Connectors tab.
- Click
next to the connector name and select Pause.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder through the --folder-name
or --folder-id
parameter.
To pause a connector, run the command:
yc managed-kafka connector pause <connector_name> \
--cluster-name=<cluster_name>
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Connector.pause method and send the following request, e.g., via cURL
:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/pause/<connector_name>'
You can request the cluster ID with a list of clusters in the folder and the connector name with a list of connectors in the cluster.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the ConnectorService/Pause call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "connector_name": "<connector_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.Pause
You can request the cluster ID with a list of clusters in the folder and the connector name with a list of connectors in the cluster.
-
View the server response to make sure the request was successful.
Resuming a connector
- In the management console
, go to the relevant folder. - In the services list, select Managed Service for Kafka.
- Select a cluster and open the Connectors tab.
- Click
next to the connector name and select Resume.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder through the --folder-name
or --folder-id
parameter.
To resume a connector, run the command:
yc managed-kafka connector resume <connector_name> \
--cluster-name=<cluster_name>
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Connector.pause method and send the following request, e.g., via cURL
:curl \ --request POST \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/resume/<connector_name>'
You can request the cluster ID with a list of clusters in the folder and the connector name with a list of connectors in the cluster.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the ConnectorService/Resume call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "connector_name": "<connector_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.Resume
You can request the cluster ID with a list of clusters in the folder and the connector name with a list of connectors in the cluster.
-
View the server response to make sure the request was successful.
Importing a connector to Terraform
Using import, you can bring the existing connectors under Terraform management.
-
In the Terraform configuration file, specify the connector you want to import:
resource "yandex_mdb_kafka_cluster" "<connector_name>" {}
-
Run the following command to import the connector:
terraform import yandex_mdb_kafka_connector.<connector_name> <cluster_ID>:<connector_name>
To learn more about importing connectors, see the Terraform
provider documentation.
Deleting a connector
- In the management console
, go to the relevant folder. - In the services list, select Managed Service for Kafka.
- Select a cluster and open the Connectors tab.
- Click
next to the connector name and select Delete. - Click Delete.
If you do not have the Yandex Cloud CLI yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder through the --folder-name
or --folder-id
parameter.
To delete a connector, run the command:
yc managed-kafka connector delete <connector_name> \
--cluster-name <cluster_name>
-
Open the current Terraform configuration file with an infrastructure plan.
For more information about creating this file, see Creating clusters.
-
Delete the
yandex_mdb_kafka_connector
resource with the connector description. -
Make sure the settings are correct.
-
Using the command line, navigate to the folder that contains the up-to-date Terraform configuration files with an infrastructure plan.
-
Run the command:
terraform validate
If there are errors in the configuration files, Terraform will point to them.
-
-
Confirm updating the resources.
-
Run the command to view planned changes:
terraform plan
If the resource configuration descriptions are correct, the terminal will display a list of the resources to modify and their parameters. This is a test step. No resources are updated.
-
If you are happy with the planned changes, apply them:
-
Run the command:
terraform apply
-
Confirm the update of resources.
-
Wait for the operation to complete.
-
-
For more information, see the Terraform
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Use the Connector.pause method and send the following request, e.g., via cURL
:curl \ --request DELETE \ --header "Authorization: Bearer $IAM_TOKEN" \ --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/<connector_name>'
You can request the cluster ID with a list of clusters in the folder and the connector name with a list of connectors in the cluster.
-
View the server response to make sure the request was successful.
-
Get an IAM token for API authentication and put it into the environment variable:
export IAM_TOKEN="<IAM_token>"
-
Clone the cloudapi
repository:cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
Below, we assume the repository contents are stored in the
~/cloudapi/
directory. -
Use the ConnectorService/Delete call and send the following request, e.g., via gRPCurl
:grpcurl \ -format json \ -import-path ~/cloudapi/ \ -import-path ~/cloudapi/third_party/googleapis/ \ -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \ -rpc-header "Authorization: Bearer $IAM_TOKEN" \ -d '{ "cluster_id": "<cluster_ID>", "connector_name": "<connector_name>" }' \ mdb.api.cloud.yandex.net:443 \ yandex.cloud.mdb.kafka.v1.ConnectorService.Delete
You can request the cluster ID with a list of clusters in the folder and the connector name with a list of connectors in the cluster.
-
View the server response to make sure the request was successful.