Transferring data to an Apache Kafka® target endpoint
Yandex Data Transfer enables you to migrate data to an Apache Kafka® queue and implement various data processing and transformation scenarios. To implement a transfer:
- Explore possible data transfer scenarios.
- Configure one of the supported data sources.
- Configure the target endpoint in Yandex Data Transfer.
- Create a transfer and start it.
- Perform required operations with the database and control the transfer.
- In case of any issues, use ready-made solutions to resolve them.
Scenarios for transferring data to Apache Kafka®
-
Migration: Moving data from one storage to another. Migration often means migrating a database from obsolete local databases to managed cloud ones.
Mirroring data across queues is a separate migration task.
-
Data change capture means tracking changes to a database and delivering those changes to consumers. It is used for applications that are sensitive to real-time data changes.
-
Data delivery is a process of delivering arbitrary data to target storage. It includes data retrieval from a queue and its deserialization with subsequent transformation to target storage format.
For a detailed description of possible Yandex Data Transfer scenarios, see Tutorials.
Configuring the data source
Configure one of the supported data sources:
For a complete list of supported sources and targets in Yandex Data Transfer, see Available transfers.
Configuring the Apache Kafka® target endpoint
When creating or updating an endpoint, you can define:
- Yandex Managed Service for Apache Kafka® cluster connection or custom installation settings and serialization settings, including those based on Yandex Compute Cloud VMs. These are required parameters.
- Apache Kafka topic settings.
Managed Service for Apache Kafka® cluster
Warning
To create or edit an endpoint of a managed database, you will need the managed-kafka.viewer
role or the primitive viewer
role for the folder the cluster of this managed database resides in.
Connection with the cluster ID specified in Yandex Cloud.
-
Managed Service for Apache Kafka cluster: Select the cluster to connect to.
-
Authentication: Select the connection type (
SASL
orNo authentication
).If you select
SASL
:- Username: Specify the name of the account, under which Data Transfer will connect to the topic.
- Password: Enter the account password.
-
Under Topic, select how you want to specify your topic:
-
Topic full name: Enter the full name of the topic.
If you do not want to split the event stream into independent queues by table, enable the Save transactions order setting.
-
Topic prefix: Enter the topic prefix.
-
-
Security groups: Select the cloud network to host the endpoint and security groups for network traffic.
Thus, you will be able to apply the specified security group rules to the VMs and clusters in the selected network without changing the settings of these VMs and clusters. For more information, see Networking in Yandex Data Transfer.
-
Under Serializing settings, select the serialization type. For more information, see Serialization settings.
-
Configure the Advanced settings of the endpoint, if required.
- Specify the configuration of topics in Configuration name format: Configuration value.
- Select compression type.
- Endpoint type:
kafka_target
.
-
security_groups
: Security groups for network traffic.Security group rules apply to a transfer. They allow opening up network access from the transfer VM to the cluster. For more information, see Networking in Yandex Data Transfer.
Security groups must belong to the same network as the cluster.
Note
In Terraform, it is not required to specify a network for security groups.
-
connection.cluster_id
: ID of the cluster to connect to. -
auth
: Authentication method used to connect to broker hosts:sasl
: SASL authentication.no_auth
: Without authentication.
Here is the configuration file example:
resource "yandex_datatransfer_endpoint" "<endpoint_name_in_Terraform>" {
name = "<endpoint_name>"
settings {
kafka_target {
security_groups = ["<list_of_security_group_IDs>"]
connection {
cluster_id = "<cluster_ID>"
}
auth {
<authentication_method>
}
<topic_settings>
<serialization_settings>
}
}
}
For more information, see the Terraform provider documentation
-
securityGroups
: Security groups for network traffic, whose rules will apply to VMs and clusters without changing their settings. For more information, see Networking in Yandex Data Transfer. -
connection.clusterId
: ID of the cluster to connect to. -
auth
: Authentication method used to connect to broker hosts:-
sasl
: SASL authentication. The following parameters are required:user
: Name of the account Data Transfer will use to connect to the topic.password.raw
: Password for the account in text form.mechanism
: Hashing mechanism.
-
noAuth
: Without authentication.
-
Custom installation
Connection with the Apache Kafka® cluster with explicitly specified network addresses and broker host ports.
-
Broker URLs: Specify the IP addresses or FQDNs of the broker hosts.
If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:
<broker_host_IP_address_or_FQDN>:<port_number>
-
SSL: Use encryption to protect the connection.
-
PEM Certificate: If data encryption is required, e.g., to comply with the PCI DSS, upload the certificate file or add its contents as text.
Endpoint network interface: Select or create a subnet in the required availability zone. The transfer will use this subnet to access the target.
If the value in this field is specified for both endpoints, both subnets must be hosted in the same availability zone.
-
Authentication: Select the connection type (
SASL
orNo authentication
).If you select
SASL
:- Username: Specify the name of the account, under which Data Transfer will connect to the topic.
- Password: Enter the account password.
- Mechanism: Select the hashing mechanism (SHA 256 or SHA 512).
-
Security groups: Select the cloud network to host the endpoint and security groups for network traffic.
Thus, you will be able to apply the specified security group rules to the VMs and clusters in the selected network without changing the settings of these VMs and clusters. For more information, see Networking in Yandex Data Transfer.
- Endpoint type:
kafka_target
.
-
security_groups
: Security groups for network traffic.Security group rules apply to a transfer. They allow opening up network access from the transfer VM to the broker hosts. For more information, see Networking in Yandex Data Transfer.
Security groups must belong to the same network as the
subnet_id
subnet, if the latter is specified.Note
In Terraform, it is not required to specify a network for security groups.
-
connection.on_premise
: Parameters for connecting to the broker hosts:-
broker_urls
: IP addresses or FQDNs of the broker hosts.If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:
<broker_host_IP_address_or_FQDN>:<port_number>
-
tls_mode
: Parameters for encryption of transmitted data if it is required, e.g., to comply with the PCI DSS requirements.disabled
: Disabled.enabled
: Enabled.ca_certificate
: CA certificate.
-
subnet_id
: ID of the subnet hosting the broker hosts. The transfer will use this subnet to access them.
-
-
auth
: Authentication method used to connect to broker hosts:sasl
: SASL authentication.no_auth
: Without authentication.
Here is the configuration file example:
resource "yandex_datatransfer_endpoint" "<endpoint_name_in_Terraform>" {
name = "<endpoint_name>"
settings {
kafka_target {
security_groups = ["<list_of_security_group_IDs>"]
connection {
on_premise {
broker_urls = ["<list_of_IP_addresses_or_broker_host_FQDNs>"]
subnet_id = "<ID_of_subnet_with_broker_hosts>"
}
}
auth = {
<authentication_method>
}
<topic_settings>
<serialization_settings>
}
}
}
For more information, see the Terraform provider documentation
-
securityGroups
: Security groups for network traffic, whose rules will apply to VMs and clusters without changing their settings. For more information, see Networking in Yandex Data Transfer. -
connection.onPremise
: Parameters for connecting to the broker hosts:-
brokerUrls
: IP addresses or FQDNs of the broker hosts.If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:
<broker_host_IP_address_or_FQDN>:<port_number>
-
tlsMode
: Parameters for encryption of transmitted data if it is required, e.g., to comply with the PCI DSS requirements.disabled
: Disabled.enabled
: Enabled.caCertificate
: CA certificate.
-
subnetId
: ID of the subnet hosting the broker hosts. The transfer will use this subnet to access them.
-
-
auth
: Authentication method used to connect to broker hosts:-
sasl
: SASL authentication. The following parameters are required:user
: Name of the account Data Transfer will use to connect to the topic.password.raw
: Password for the account in text form.mechanism
: Hashing mechanism.
-
noAuth
: Without authentication.
-
Apache Kafka® topic settings
-
Topic:
-
Topic full name: Specify the name of the topic to send messages to. Select Save transactions order, so as not to split an event stream into independent queues by table.
-
Topic prefix: Specify the topic prefix, similar to the
Debezium database.server.name
setting. Messages will go to the topic named<topic_prefix>.<schema>.<table_name>
.
-
Under topic_settings
, specify one of the options to send messages to a topic:
-
topic
: Specify parameters in this section to send all messages to one topic:topic_name
: Name of the topic to send messages to.save_tx_order
: Option for saving the order of transactions. Set totrue
not to split the event stream into independent queues by table.
-
topic_prefix
: Specify a prefix to send messages to different topics with the given prefix.It is similar to the Debezium
database.server.name
setting. Messages will be sent to a topic named<topic_prefix>.<schema>.<table_name>
.
In the topicSettings
field, specify one of the options to send messages to a topic:
-
topic
: Specify parameters in this field to send all messages to one topic:topicName
: Name of the topic to send messages to.saveTxOrder
: Option for saving the order of transactions. Set totrue
not to split the event stream into independent queues by table.
-
topicPrefix
: Specify a prefix to send messages to different topics with the given prefix.It is similar to the Debezium
database.server.name
setting. Messages will be sent to a topic named<topic_prefix>.<schema>.<table_name>
.
Yandex Data Transfer supports CDC for transfers from PostgreSQL, MySQL®, and YDB databases to Apache Kafka® and Yandex Data Streams. Data is sent to the target in Debezium format. For more information about CDC mode, see Change data capture.
Note
In YDB, CDC mode is supported starting from version 22.5.
Serialization settings
-
Under Serializing settings, select the serialization type:
-
Auto: Automatic serialization.
-
Debezium: Serialization under the Debezium standards:
- Select the message key schema (matches the
key.converter
Debezium parameter). - Select the message value schema (matches the
value.converter
Debezium parameter). - If required, specify Debezium serializer settings in
Parameter
-Value
format.
- Select the message key schema (matches the
-
If you want to use JSON schemas in Yandex Schema Registry and need to keep them compatible while adding optional parameters, specify these settings:
-
Serializing settings: Debezium.
-
To use Schema Registry for keys, select Key schema type: JSON (via Schema Registry). To use Schema Registry for values, select Value schema type: JSON (via Schema Registry).
- URL: Schema Registry namespace endpoint. You can copy the endpoint from the details for the Schema Registry namespace connection on the Debezium tab, in the
value.converter.schema.registry.url
parameter. - Username:
api-key
. - Password: Value of the API key with a limited scope used for connecting to Schema Registry. To get this value:
-
Create an API key with a limited scope and place it in the
SECRET
local variable:yc iam api-key create --folder-id <folder_ID> \ --service-account-name <name_of_service_account_for_operations_with_Schema_Registry> \ --scope yc.schema-registry.schemas.manage \ --expires-at '2030-01-01T00:00:00Z' >./api-key.yaml && \ SECRET=`cat ./api-key.yaml | grep 'secret:' | awk '{print $2}'`
-
Print the
SECRET
variable value to the terminal:echo $SECRET
-
Copy the printed value and paste it into the Password field in the endpoint creation window.
-
- URL: Schema Registry namespace endpoint. You can copy the endpoint from the details for the Schema Registry namespace connection on the Debezium tab, in the
-
Under Debezium serializer settings:
- To generate a closed schema for keys, include the
key.converter.dt.json.generate.closed.content.schema
parameter set totrue
. - To generate a closed schema for values, include the
value.converter.dt.json.generate.closed.content.schema
parameter set totrue
.
- To generate a closed schema for keys, include the
Under serializer
, specify the selected serialization type:
-
serializer_auto
-
serializer_json
-
serializer_debezium
For this type, you can specify the Debezium serialization parameters as
key
/value
pairs in theserializer_debezium.serializer_parameters
field.
If you want to use JSON schemas in Yandex Schema Registry and need to keep them compatible while adding optional parameters, add the serializer
section with a description of the serialization settings to the configuration file. To generate a closed schema for keys, include the key.converter.dt.json.generate.closed.content.schema
variable set to true
in the serializer
section. To generate a closed schema for values, include the value.converter.dt.json.generate.closed.content.schema
variable set to true
in the serializer
section.
resource "yandex_datatransfer_endpoint" "<endpoint_name_in_Terraform>" {
...
settings {
kafka_target {
...
serializer {
serializer_debezium {
serializer_parameters {
key = "key.converter.dt.json.generate.closed.content.schema"
value = "true"
}
serializer_parameters {
key = "value.converter.dt.json.generate.closed.content.schema"
value = "true"
}
serializer_parameters {
key = "value.converter.schemas.enable"
value = "true"
}
serializer_parameters {
key = "value.converter.schema.registry.url"
value = "<Schema_Registry_namespace_URL>"
}
serializer_parameters {
key = "value.converter.basic.auth.user.info"
value = "api-key:<API_key_value>"
}
}
}
}
}
}
Where:
Schema_Registry_namespace_URL
: Schema Registry namespace endpoint. You can copy the endpoint from the details for the Schema Registry namespace connection on the Debezium tab, in thevalue.converter.schema.registry.url
parameter.API_key_value
: Value of the API key with a limited scope used for connecting to Schema Registry. To get this value:-
Create an API key with a limited scope and place it in the
SECRET
local variable:yc iam api-key create --folder-id <folder_ID> \ --service-account-name <name_of_service_account_for_operations_with_Schema_Registry> \ --scope yc.schema-registry.schemas.manage \ --expires-at '2030-01-01T00:00:00Z' >./api-key.yaml && \ SECRET=`cat ./api-key.yaml | grep 'secret:' | awk '{print $2}'`
-
Print the
SECRET
variable value to the terminal:echo $SECRET
-
Copy the printed value and paste it into the
value
parameter in the configuration file.
-
Under serializer
, specify the selected serialization type:
-
serializerAuto
-
serializerJson
-
serializerDebezium
For this type, you can specify the Debezium serialization parameters as
key
/value
pairs in theserializerDebezium.serializerParameters
field.
If you want to use JSON schemas in Yandex Schema Registry and need to keep them compatible while adding optional parameters, add the serializer
object with a description of the serialization settings to the request body. To generate a closed schema for keys, add the key.converter.dt.json.generate.closed.content.schema
variable set to true
in serializer
. To generate a closed schema for values, add the value.converter.dt.json.generate.closed.content.schema
variable set to true
in serializer
.
"serializer": {
"serializerDebezium": {
"serializerParameters": [
{
"key": "converter.dt.json.generate.closed.content.schema",
"value": "true"
},
{
"key": "value.converter",
"value": "io.confluent.connect.json.JsonSchemaConverter"
},
{
"key": "value.converter.schemas.enable",
"value": "true"
},
{
"key": "value.converter.schema.registry.url",
"value": "<Schema_Registry_namespace_URL>"
},
{
"key": "value.converter.basic.auth.user.info",
"value": "api-key:<API_key_value>"
}
]
}
}
Where:
Schema_Registry_namespace_URL
: Schema Registry namespace endpoint. You can copy the endpoint from the details for the Schema Registry namespace connection on the Debezium tab, in thevalue.converter.schema.registry.url
parameter.API_key_value
: Value of the API key with a limited scope used for connecting to Schema Registry. To get this value:-
Create an API key with a limited scope and place it in the
SECRET
local variable:yc iam api-key create --folder-id <folder_ID> \ --service-account-name <name_of_service_account_for_operations_with_Schema_Registry> \ --scope yc.schema-registry.schemas.manage \ --expires-at '2030-01-01T00:00:00Z' >./api-key.yaml && \ SECRET=`cat ./api-key.yaml | grep 'secret:' | awk '{print $2}'`
-
Print the
SECRET
variable value to the terminal:echo $SECRET
-
Copy the printed value and paste it into the
value
parameter in the configuration file.
-
Additional settings
You can specify topic configuration parameters
Specify the parameter and one of its possible values, e.g., cleanup.policy
and compact
.
After configuring the data source and target, create and start the transfer.