Transferring data from an Apache Kafka® source endpoint
Yandex Data Transfer enables you to migrate data from an Apache Kafka® queue and implement various data transfer, processing, and transformation scenarios. To implement a transfer:
- Explore possible data transfer scenarios.
- Prepare the Apache Kafka® database for the transfer.
- Set up an endpoint source in Yandex Data Transfer.
- Set up one of the supported data targets.
- Create a transfer and start it.
- Perform required operations with the database and control the transfer.
- In case of any issues, use ready-made solutions to resolve them.
Scenarios for transferring data from Apache Kafka®
-
Migration: Moving data from one repository to another; it often suggests transferring a database to the cloud, from outdated local databases to managed cloud ones.
Mirroring data across queues is a separate migration task.
-
Data delivery is a process of delivering arbitrary data to target storage. It includes data retrieval from a queue and its deserialization with subsequent transformation to target storage format.
For a detailed description of possible Yandex Data Transfer data transfer scenarios, see Tutorials.
Preparing the source database
- Create a user with the
ACCESS_ROLE_CONSUMER
role for the source topic.
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
. -
Configure user access rights
to the topic you need. -
Grant the
READ
permissions to the consumer group whose ID matches the transfer ID.bin/kafka-acls --bootstrap-server localhost:9092 \ --command-config adminclient-configs.conf \ --add \ --allow-principal User:username \ --operation Read \ --group <transfer_ID>
-
(Optional) To use username and password authorization, configure SASL authentication
.
Configuring the Apache Kafka® source endpoint
When creating or editing an endpoint, you can define:
- Yandex Managed Service for Apache Kafka® cluster connection or custom installation settings, including those based on Yandex Compute Cloud VMs. These are required parameters.
- Additional settings.
Managed Service for Apache Kafka® cluster
Warning
To create or edit an endpoint of a managed database, you need to have the managed-kafka.viewer
role or the viewer
primitive role assigned for the folder where this managed database cluster resides.
Connection with the cluster ID specified in Yandex Cloud.
-
Managed Service for Apache Kafka cluster: Select the cluster to connect to.
-
Authentication: Select the connection type (
SASL
orNo authentication
).If you select
SASL
:- Username: Specify the name of the account, under which Data Transfer will connect to the topic.
- Password: Enter the account password.
-
Topic full name: Specify the name of the topic to connect to.
-
Security groups: Select the cloud network to host the endpoint and security groups for network traffic.
Thus, you will be able to apply the specified security group rules to the VMs and clusters in the selected network without changing the settings of these VMs and clusters. For more information, see Networking in Yandex Data Transfer.
Custom installation
Connection with the Apache Kafka® cluster with explicitly specified network addresses and broker host ports.
-
Broker URLs: Specify the IP addresses or FQDNs of the broker hosts.
If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:
<broker_host_IP_address_or_FQDN>:<port_number>
-
SSL: Use encryption to protect the connection.
-
PEM Certificate: If encryption of transmitted data is required, e.g., to meet the PCI DSS
requirements, upload the certificate file or add its contents as text. -
Endpoint network interface: Select or create a subnet in the desired availability zone.
If the value in this field is specified for both endpoints, both subnets must be hosted in the same availability zone.
-
Authentication: Select the connection type (
SASL
orNo authentication
).If you select
SASL
:- Username: Specify the name of the account, under which Data Transfer will connect to the topic.
- Password: Enter the account password.
- Mechanism: Select the hashing mechanism (SHA 256 or SHA 512).
-
Topic full name: Specify the name of the topic to connect to.
-
Security groups: Select the cloud network to host the endpoint and security groups for network traffic.
Thus, you will be able to apply the specified security group rules to the VMs and clusters in the selected network without changing the settings of these VMs and clusters. For more information, see Networking in Yandex Data Transfer.
Advanced settings
Use advanced settings to specify transformation and conversion rules. Data is processed in the following order:
-
Transformation. Data in JSON format is provided to a Yandex Cloud Functions function. The function body contains metadata and raw data from the queue. The function handles the data and sends it back to Data Transfer.
-
Conversion. Data is parsed as a preparation for delivery to the target.
If no transformation rules are set, parsing is applied to raw data from the queue. If no conversion rules are set, the data goes directly to the target.
-
Transformation rules:
-
Cloud function: Select one of the functions created in Cloud Functions.
-
Service account: Select or create a service account that the processing function will start under.
-
Number of retries: Set the number of attempts to invoke the processing function.
-
Buffer size for function: Set the size of the buffer (in bytes) which when full data will be transferred to the processing function.
The maximum buffer size is 3.5 MB. For more information about restrictions that apply when working with functions in Cloud Functions, see the corresponding section.
-
Flush interval: Set the duration of the interval (in seconds) after the expiration of which the data from the stream should be transferred to the processing function.
Note
If the buffer becomes full or the sending interval expires, the data is transferred to the processing function.
-
Invocation timeout: Set the allowed timeout of the response from the processing function (in seconds).
Warning
Values in the Flush interval and Invocation timeout fields are specified with the
s
postfix, for example,10s
. -
-
Conversion rules:
- Data format: Select one of the available formats:
-
JSON
: JSON format. -
AuditTrails.v1 parser
: Audit Trails log format. -
CloudLogging parser
: Cloud Logging log format. -
Debezium CDC parser
: Debezium CDC. It allows specifying Confluent Schema Registry in the settings.For JSON, specify:
- Data scheme: Specify the schema as a list of fields or upload a file with a description of the schema in JSON format.
Sample data schema[ { "name": "request", "type": "string" } ]
- Enable NULL values in keys: Select this option to allow the
null
value in key columns. - Add a column for missing keys: Select this option to have the fields missing in the schema appear in the
_rest
column. - Unescape string values: Select this option to remove quotation marks from string variables. Otherwise, the string field values will remain unchanged.
For Debezium CDC, specify the following: Schema Registry URL, authentication method (including the username and user password if authentication is used), and CA certificate.
-
- Data format: Select one of the available formats:
Configuring the data target
Configure one of the supported data targets:
- PostgreSQL.
- MySQL.
- MongoDB
- ClickHouse®.
- Greenplum®.
- Yandex Managed Service for YDB.
- Yandex Object Storage.
- Apache Kafka®.
- YDS.
- Elasticsearch.
- OpenSearch.
For a complete list of supported sources and targets in Yandex Data Transfer, see Available Transfers.
After configuring the data source and target, create and start the transfer.