Yandex Cloud
Search
Contact UsGet started
  • Blog
  • Pricing
  • Documentation
  • All Services
  • System Status
    • Featured
    • Infrastructure & Network
    • Data Platform
    • Containers
    • Developer tools
    • Serverless
    • Security
    • Monitoring & Resources
    • ML & AI
    • Business tools
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Customer Stories
    • Gateway to Russia
    • Cloud for Startups
    • Education and Science
  • Blog
  • Pricing
  • Documentation
Yandex project
© 2025 Yandex.Cloud LLC
Yandex Data Transfer
  • Available transfers
  • Getting started
    • All guides
    • Preparing for a transfer
      • Managing endpoints
      • Migrating endpoints to a different availability zone
        • Source
        • Target
    • Managing transfer process
    • Working with databases during transfer
    • Monitoring transfer status
  • Troubleshooting
  • Access management
  • Terraform reference
  • Monitoring metrics
  • Audit Trails events
  • Public materials

In this article:

  • Scenarios for transferring data from Apache Kafka®
  • Preparing the source database
  • Configuring the Apache Kafka® source endpoint
  • Managed Service for Apache Kafka® cluster
  • Custom installation
  • Advanced settings
  • Configuring the data target
  1. Step-by-step guides
  2. Configuring endpoints
  3. Apache Kafka®
  4. Source

Transferring data from an Apache Kafka® source endpoint

Written by
Yandex Cloud
Updated at April 24, 2025
  • Scenarios for transferring data from Apache Kafka®
  • Preparing the source database
  • Configuring the Apache Kafka® source endpoint
    • Managed Service for Apache Kafka® cluster
    • Custom installation
    • Advanced settings
  • Configuring the data target

Yandex Data Transfer enables you to migrate data from an Apache Kafka® queue and implement various data transfer, processing, and transformation scenarios. To implement a transfer:

  1. Explore possible data transfer scenarios.
  2. Prepare the Apache Kafka® database for the transfer.
  3. Set up a source endpoint in Yandex Data Transfer.
  4. Set up one of the supported data targets.
  5. Create a transfer and start it.
  6. Perform required operations with the database and control the transfer.
  7. In case of any issues, use ready-made solutions to resolve them.

Scenarios for transferring data from Apache Kafka®Scenarios for transferring data from Apache Kafka®

  1. Migration: Moving data from one storage to another. Migration often means migrating a database from obsolete local databases to managed cloud ones.

    Mirroring data across queues is a separate migration task.

    • Apache Kafka® mirroring.
  2. Data delivery is a process of delivering arbitrary data to target storage. It includes data retrieval from a queue and its deserialization with subsequent transformation to target storage format.

    • Apache Kafka® to ClickHouse®.
    • Apache Kafka® to PostgreSQL.
    • Apache Kafka® to Greenplum®.
    • Apache Kafka® to MongoDB.
    • Apache Kafka® to MySQL®.
    • Apache Kafka® to OpenSearch.
    • Apache Kafka® to YDB.
    • Apache Kafka® in YDS.

For a detailed description of possible Yandex Data Transfer scenarios, see Tutorials.

Preparing the source databasePreparing the source database

Managed Service for Apache Kafka®
Apache Kafka®

Create a user with the ACCESS_ROLE_CONSUMER role for the source topic.

  1. If not planning to use Cloud Interconnect or VPN for connections to an external cluster, make such cluster accessible from the Internet from IP addresses used by Data Transfer.

    For details on linking your network up with external resources, see this concept.

  2. Configure access to the source cluster from Yandex Cloud.

  3. Configure user access rights to the topic you need.

  4. Grant the READ permissions to the consumer group whose ID matches the transfer ID.

    bin/kafka-acls --bootstrap-server localhost:9092 \
      --command-config adminclient-configs.conf \
      --add \
      --allow-principal User:username \
      --operation Read \
      --group <transfer_ID>
    
  5. Optionally, to log in with a username and password, configure SASL authentication.

Configuring the Apache Kafka® source endpointConfiguring the Apache Kafka® source endpoint

When creating or updating an endpoint, you can define:

  • Yandex Managed Service for Apache Kafka® cluster connection or custom installation settings, including those based on Yandex Compute Cloud VMs. These are required parameters.
  • Additional settings.

Managed Service for Apache Kafka® clusterManaged Service for Apache Kafka® cluster

Warning

To create or edit an endpoint of a managed database, you will need the managed-kafka.viewer role or the primitive viewer role for the folder the cluster of this managed database resides in.

Connection with the cluster ID specified in Yandex Cloud.

Management console
Terraform
API
  • Managed Service for Apache Kafka cluster: Select the cluster to connect to.

  • Authentication: Select the connection type (SASL or No authentication).

    If you select SASL:

    • Username: Specify the name of the account, under which Data Transfer will connect to the topic.
    • Password: Enter the account password.
  • Topic full name: Specify the name of the topic to connect to.

  • Security groups: Select the cloud network to host the endpoint and security groups for network traffic.

    Thus, you will be able to apply the specified security group rules to the VMs and clusters in the selected network without changing the settings of these VMs and clusters. For more information, see Networking in Yandex Data Transfer.

  • Endpoint type: kafka_source.
  • security_groups: Security groups for network traffic.

    Security group rules apply to a transfer. They allow opening up network access from the transfer VM to the cluster. For more information, see Networking in Yandex Data Transfer.

    Security groups must belong to the same network as the cluster.

    Note

    In Terraform, it is not required to specify a network for security groups.

  • connection.cluster_id: ID of the cluster to connect to.

  • auth: Authentication method used to connect to broker hosts:

    • sasl: SASL authentication.
    • no_auth: Without authentication.
  • topic_names: Names of the topics to connect to.

Here is the configuration file example:

resource "yandex_datatransfer_endpoint" "<endpoint_name_in_Terraform>" {
  name = "<endpoint_name>"
  settings {
    kafka_source {
      security_groups = ["<list_of_security_group_IDs>"]
      connection {
        cluster_id = "<cluster_ID>"
      }
      auth {
        <authentication_method>
      }
      topic_names = ["<topic_name_list>"]
      <endpoint_advanced_settings>
    }
  }
}

For more information, see the Terraform provider documentation.

  • securityGroups: Security groups for network traffic, whose rules will apply to VMs and clusters without changing their settings. For more information, see Networking in Yandex Data Transfer.

  • connection.clusterId: ID of the cluster to connect to.

  • auth: Authentication method used to connect to broker hosts:

    • sasl: SASL authentication. The following parameters are required:

      • user: Name of the account Data Transfer will use to connect to the topic.
      • password.raw: Password for the account in text form.
      • mechanism: Hashing mechanism.
    • noAuth: Without authentication.

  • topicNames: Names of the topics to connect to.

Custom installationCustom installation

Connection with the Apache Kafka® cluster with explicitly specified network addresses and broker host ports.

Management console
Terraform
API
  • Broker URLs: Specify the IP addresses or FQDNs of the broker hosts.

    If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:

    <broker_host_IP_address_or_FQDN>:<port_number>
    
  • SSL: Use encryption to protect the connection.

  • PEM Certificate: If transmitted data has to be be encrypted, e.g., to meet the PCI DSS, upload the certificate file or add its contents as text.

    Warning

    If no certificate is added, the transfer may fail with an error.

  • Endpoint network interface: Select or create a subnet in the required availability zone. The transfer will use this subnet to access the source.

    If the value in this field is specified for both endpoints, both subnets must be hosted in the same availability zone.

  • Authentication: Select the connection type (SASL or No authentication).

    If you select SASL:

    • Username: Specify the name of the account, under which Data Transfer will connect to the topic.
    • Password: Enter the account password.
    • Mechanism: Select the hashing mechanism (SHA 256 or SHA 512).
  • Topic full name: Specify the name of the topic to connect to.

  • Security groups: Select the cloud network to host the endpoint and security groups for network traffic.

    Thus, you will be able to apply the specified security group rules to the VMs and clusters in the selected network without changing the settings of these VMs and clusters. For more information, see Networking in Yandex Data Transfer.

  • Endpoint type: kafka_source.
  • security_groups: Security groups for network traffic.

    Security group rules apply to a transfer. They allow opening up network access from the transfer VM to the broker hosts. For more information, see Networking in Yandex Data Transfer.

    Security groups must belong to the same network as the subnet_id subnet, if the latter is specified.

    Note

    In Terraform, it is not required to specify a network for security groups.

  • connection.on_premise: Parameters for connecting to the broker hosts:

    • broker_urls: IP addresses or FQDNs of the broker hosts.

      If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:

      <broker_host_IP_address_or_FQDN>:<port_number>
      
    • tls_mode: Parameters for encrypting the data to transfer, if required, e.g., for compliance with the PCI DSS requirements.

      • disabled: Disabled.
      • enabled: Enabled.
        • ca_certificate: CA certificate.

          Warning

          If no certificate is added, the transfer may fail with an error.

    • subnet_id: ID of the subnet hosting the broker hosts. The transfer will use this subnet to access them.

  • auth: Authentication method used to connect to broker hosts:

    • sasl: SASL authentication.
    • no_auth: Without authentication.
  • topic_names: Names of the topics to connect to.

Here is the configuration file example:

resource "yandex_datatransfer_endpoint" "<endpoint_name_in_Terraform>" {
  name = "<endpoint_name>"
  settings {
    kafka_source {
      security_groups = ["<list_of_security_group_IDs>"]
      connection {
        on_premise {
          broker_urls = ["<list_of_IP_addresses_or_broker_host_FQDNs>"]
          subnet_id  = "<ID_of_subnet_with_broker_hosts>"
        }
      }
      auth = {
        <authentication_method>
      }
      topic_names = ["<topic_name_list>"]
      <endpoint_advanced_settings>
    }
  }
}

For more information, see the Terraform provider documentation.

  • securityGroups: Security groups for network traffic, whose rules will apply to VMs and clusters without changing their settings. For more information, see Networking in Yandex Data Transfer.

  • connection.onPremise: Parameters for connecting to the broker hosts:

    • brokerUrls: IP addresses or FQDNs of the broker hosts.

      If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:

      <broker_host_IP_address_or_FQDN>:<port_number>
      
    • tlsMode: Parameters for encrypting the data to transfer, if required, e.g., for compliance with the PCI DSS requirements.

      • disabled: Disabled.
      • enabled: Enabled.
        • caCertificate: CA certificate.

          Warning

          If no certificate is added, the transfer may fail with an error.

    • subnetId: ID of the subnet hosting the broker hosts. The transfer will use this subnet to access them.

  • auth: Authentication method used to connect to broker hosts:

    • sasl: SASL authentication. The following parameters are required:

      • user: Name of the account Data Transfer will use to connect to the topic.
      • password.raw: Password for the account in text form.
      • mechanism: Hashing mechanism.
    • noAuth: Without authentication.

  • topicNames: Names of the topics to connect to.

Advanced settingsAdvanced settings

Use advanced settings to specify transformation and conversion rules. Data is processed in the following order:

  1. Transformation. Data in JSON format is provided to a Yandex Cloud Functions function. The function body contains metadata and raw data from the queue. The function handles the data and sends it back to Data Transfer.

  2. Conversion. Data is parsed as a preparation for delivery to the target.

If no transformation rules are set, parsing is applied to raw data from the queue. If no conversion rules are set, the data goes directly to the target.

Management console
Terraform
API
  • Transformation rules:

    • Cloud function: Select one of the functions created in Cloud Functions.

    • Service account: Select or create a service account you are going to use to start the processing function.

    • Number of retries: Set the number of attempts to invoke the processing function.

    • Buffer size for function: Set the size of the buffer (in bytes) which when full data will be transferred to the processing function.

      The maximum buffer size is 3.5 MB. For more information about restrictions that apply to functions in Cloud Functions, see the relevant section.

    • Flush interval: Set the interval (in seconds) to wait before transferring stream data to the processing function.

      Note

      If the buffer is full or the sending interval expires, data will be transferred to the processing function.

    • Invocation timeout: Set the allowed timeout of the response from the processing function (in seconds).

    Warning

    Values in the Flush interval and Invocation timeout fields are specified with the s postfix, e.g., 10s.

  • Conversion rules:

    • Data format: Select one of the available formats:
      • JSON: JSON format.

      • AuditTrails.v1 parser: Audit Trails log format.

      • CloudLogging parser: Cloud Logging log format.

      • Debezium CDC parser: Debezium CDC. It allows specifying Confluent Schema Registry in the settings.

        For JSON, specify:

        • Data scheme: Specify the schema as a list of fields or upload a file with a description of the schema in JSON format.
        Sample data schema
        [
            {
                "name": "request",
                "type": "string"
              }
        ]
        
        • Enable NULL values in keys: Select this option to allow the null value in key columns.
        • Add a column for missing keys: Select this option to have the fields missing in the schema appear in the _rest column.
        • Unescape string values: Select this option to remove quotation marks from string variables. Otherwise, the string field values will remain unchanged.

        For Debezium CDC, specify the following: Schema Registry URL, authentication method (including the username and user password if authentication is used), and CA certificate.

  • transformer: Transformation rules:

  • parser: Conversion rules which depend on the selected parser:

    • audit_trails_v1_parser: Audit Trails log parser.

    • cloud_logging_parser: Cloud Logging log parser.

    • json_parser or tskv_parser: JSON and TSKV parsers, respectively.

      Both parsers use the same attributes to define parameters:

      • data_schema: Data schema represented either as a JSON object with a schema description or as a list of fields:

        • fields: Schema represented as a JSON object. The object contains an array of JSON objects that describe individual columns.
        • json_fields: Schema represented as a list of fields.
      • null_keys_allowed: Set to true to allow the null value in key columns.

      • add_rest_column: Set to true to add the fields missing in the schema to the _rest column.

      • unescape_string_values: Set to true to remove quotation marks from string variables. Otherwise, the string field values will remain unchanged.

  • transformer: Transformation rules:

    • cloudFunction: ID of the function created in Cloud Functions.

    • serviceAccountId: ID of the service account you will use to invoke the processing function.

    • numberOfRetries: Number of attempts to invoke the processing function.

    • bufferSize: Buffer size, in bytes. When the buffer is full, data will be transferred to the processing function.

      The maximum buffer size is 3.5 MB. For more information about restrictions that apply to functions in Cloud Functions, see the relevant section.

    • bufferFlushInterval: Interval to wait before transferring stream data to the processing function, in seconds.

      Note

      If the buffer is full or the flush interval has expired, the data is transferred to the processing function.

    • invocationTimeout: Allowed timeout of a processing function response, in seconds.

    Warning

    Append s to the bufferFlushInterval and invocationTimeout values, e.g., 10s.

  • parser: Conversion rules which depend on the selected parser:

    • auditTrailsV1Parser: Audit Trails log parser.

    • cloudLoggingParser: Cloud Logging log parser.

    • jsonParser or tskvParser: JSON and TSKV parsers, respectively.

      Both parsers share the same parameter fields:

      • dataSchema: Data schema represented either as a JSON object with a schema description or as a list of fields:

        • fields: Schema represented as a JSON object. The object contains an array of JSON objects that describe individual columns.

        • jsonFields: Schema represented as a list of fields.

      • nullKeysAllowed: Set to true to allow the null value in key columns.

      • addRestColumn: Set to true to add the fields missing in the schema to the _rest column.

      • unescapeStringValues: Set to true to remove quotation marks from string variables. Otherwise, the string field values will remain unchanged.

Configuring the data targetConfiguring the data target

Configure one of the supported data targets:

  • PostgreSQL
  • MySQL®
  • MongoDB
  • ClickHouse®
  • Greenplum®
  • Yandex Managed Service for YDB
  • Yandex Object Storage
  • Apache Kafka®
  • YDS
  • Elasticsearch
  • OpenSearch.

For a complete list of supported sources and targets in Yandex Data Transfer, see Available transfers.

After configuring the data source and target, create and start the transfer.

Was the article helpful?

Previous
Migrating endpoints to a different availability zone
Next
Target
Yandex project
© 2025 Yandex.Cloud LLC