Yandex Cloud
Search
Contact UsGet started
  • Blog
  • Pricing
  • Documentation
  • All Services
  • System Status
    • Featured
    • Infrastructure & Network
    • Data Platform
    • Containers
    • Developer tools
    • Serverless
    • Security
    • Monitoring & Resources
    • ML & AI
    • Business tools
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Customer Stories
    • Gateway to Russia
    • Cloud for Startups
    • Education and Science
  • Blog
  • Pricing
  • Documentation
Yandex project
© 2025 Yandex.Cloud LLC
Yandex Data Transfer
  • Available transfers
  • Getting started
    • All guides
    • Preparing for a transfer
      • Managing endpoints
      • Migrating endpoints to a different availability zone
        • Source
        • Target
    • Managing transfer process
    • Working with databases during transfer
    • Monitoring transfer status
  • Troubleshooting
  • Access management
  • Terraform reference
  • Monitoring metrics
  • Audit Trails events
  • Public materials

In this article:

  • Scenarios for transferring data to Apache Kafka®
  • Configuring the data source
  • Configuring the Apache Kafka® target endpoint
  • Managed Service for Apache Kafka® cluster
  • Custom installation
  • Apache Kafka® topic settings
  • Serialization settings
  • Additional settings
  1. Step-by-step guides
  2. Configuring endpoints
  3. Apache Kafka®
  4. Target

Transferring data to an Apache Kafka® target endpoint

Written by
Yandex Cloud
Updated at April 24, 2025
  • Scenarios for transferring data to Apache Kafka®
  • Configuring the data source
  • Configuring the Apache Kafka® target endpoint
    • Managed Service for Apache Kafka® cluster
    • Custom installation
    • Apache Kafka® topic settings
    • Serialization settings
    • Additional settings

Yandex Data Transfer enables you to migrate data to an Apache Kafka® queue and implement various data processing and transformation scenarios. To implement a transfer:

  1. Explore possible data transfer scenarios.
  2. Configure one of the supported data sources.
  3. Configure the target endpoint in Yandex Data Transfer.
  4. Create a transfer and start it.
  5. Perform required operations with the database and control the transfer.
  6. In case of any issues, use ready-made solutions to resolve them.

Scenarios for transferring data to Apache Kafka®Scenarios for transferring data to Apache Kafka®

  1. Migration: Moving data from one storage to another. Migration often means migrating a database from obsolete local databases to managed cloud ones.

    Mirroring data across queues is a separate migration task.

    • Apache Kafka® mirroring
  2. Data change capture means tracking changes to a database and delivering those changes to consumers. It is used for applications that are sensitive to real-time data changes.

    • Capturing changes from MySQL® and delivering to Apache Kafka®.
    • YDB change data capture and delivery to Apache Kafka®.
    • Capturing changes from PostgreSQL and delivering to Apache Kafka®.
  3. Data delivery is a process of delivering arbitrary data to target storage. It includes data retrieval from a queue and its deserialization with subsequent transformation to target storage format.

    • Delivering data from a YDS queue to Apache Kafka®

For a detailed description of possible Yandex Data Transfer scenarios, see Tutorials.

Configuring the data sourceConfiguring the data source

Configure one of the supported data sources:

  • PostgreSQL
  • MySQL®
  • Apache Kafka®
  • Airbyte®
  • YDS
  • Managed Service for YDB
  • Elasticsearch
  • OpenSearch

For a complete list of supported sources and targets in Yandex Data Transfer, see Available transfers.

Configuring the Apache Kafka® target endpointConfiguring the Apache Kafka® target endpoint

When creating or updating an endpoint, you can define:

  • Yandex Managed Service for Apache Kafka® cluster connection or custom installation settings and serialization settings, including those based on Yandex Compute Cloud VMs. These are required parameters.
  • Apache Kafka topic settings.

Managed Service for Apache Kafka® clusterManaged Service for Apache Kafka® cluster

Warning

To create or edit an endpoint of a managed database, you will need the managed-kafka.viewer role or the primitive viewer role for the folder the cluster of this managed database resides in.

Connection with the cluster ID specified in Yandex Cloud.

Management console
Terraform
API
  • Managed Service for Apache Kafka cluster: Select the cluster to connect to.

  • Authentication: Select the connection type (SASL or No authentication).

    If you select SASL:

    • Username: Specify the name of the account, under which Data Transfer will connect to the topic.
    • Password: Enter the account password.
  • Under Topic, select how you want to specify your topic:

    • Topic full name: Enter the full name of the topic.

      If you do not want to split the event stream into independent queues by table, enable the Save transactions order setting.

    • Topic prefix: Enter the topic prefix.

  • Security groups: Select the cloud network to host the endpoint and security groups for network traffic.

    Thus, you will be able to apply the specified security group rules to the VMs and clusters in the selected network without changing the settings of these VMs and clusters. For more information, see Networking in Yandex Data Transfer.

  • Under Serializing settings, select the serialization type. For more information, see Serialization settings.

  • Configure the Advanced settings of the endpoint, if required.

    • Specify the configuration of topics in Configuration name format: Configuration value.
    • Select compression type.
  • Endpoint type: kafka_target.
  • security_groups: Security groups for network traffic.

    Security group rules apply to a transfer. They allow opening up network access from the transfer VM to the cluster. For more information, see Networking in Yandex Data Transfer.

    Security groups must belong to the same network as the cluster.

    Note

    In Terraform, it is not required to specify a network for security groups.

  • connection.cluster_id: ID of the cluster to connect to.

  • auth: Authentication method used to connect to broker hosts:

    • sasl: SASL authentication.
    • no_auth: Without authentication.

Here is the configuration file example:

resource "yandex_datatransfer_endpoint" "<endpoint_name_in_Terraform>" {
  name = "<endpoint_name>"
  settings {
    kafka_target {
      security_groups = ["<list_of_security_group_IDs>"]
      connection {
        cluster_id = "<cluster_ID>"
      }
      auth {
        <authentication_method>
      }
      <topic_settings>
      <serialization_settings>
    }
  }
}

For more information, see the Terraform provider documentation.

  • securityGroups: Security groups for network traffic, whose rules will apply to VMs and clusters without changing their settings. For more information, see Networking in Yandex Data Transfer.

  • connection.clusterId: ID of the cluster to connect to.

  • auth: Authentication method used to connect to broker hosts:

    • sasl: SASL authentication. The following parameters are required:

      • user: Name of the account Data Transfer will use to connect to the topic.
      • password.raw: Password for the account in text form.
      • mechanism: Hashing mechanism.
    • noAuth: Without authentication.

Custom installationCustom installation

Connection with the Apache Kafka® cluster with explicitly specified network addresses and broker host ports.

Management console
Terraform
API
  • Broker URLs: Specify the IP addresses or FQDNs of the broker hosts.

    If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:

    <broker_host_IP_address_or_FQDN>:<port_number>
    
  • SSL: Use encryption to protect the connection.

  • PEM Certificate: If transmitted data has to be be encrypted, e.g., to meet the PCI DSS, upload the certificate file or add its contents as text.

    Warning

    If no certificate is added, the transfer may fail with an error.

  • Endpoint network interface: Select or create a subnet in the required availability zone. The transfer will use this subnet to access the target.

If this field's value is specified for both endpoints, both subnets must be hosted in the same availability zone.

  • Authentication: Select the connection type (SASL or No authentication).

    If you select SASL:

    • Username: Specify the name of the account, under which Data Transfer will connect to the topic.
    • Password: Enter the account password.
    • Mechanism: Select the hashing mechanism (SHA 256 or SHA 512).
  • Security groups: Select the cloud network to host the endpoint and security groups for network traffic.

    Thus, you will be able to apply the specified security group rules to the VMs and clusters in the selected network without changing the settings of these VMs and clusters. For more information, see Networking in Yandex Data Transfer.

  • Endpoint type: kafka_target.
  • security_groups: Security groups for network traffic.

    Security group rules apply to a transfer. They allow opening up network access from the transfer VM to the broker hosts. For more information, see Networking in Yandex Data Transfer.

    Security groups must belong to the same network as the subnet_id subnet, if the latter is specified.

    Note

    In Terraform, it is not required to specify a network for security groups.

  • connection.on_premise: Parameters for connecting to the broker hosts:

    • broker_urls: IP addresses or FQDNs of the broker hosts.

      If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:

      <broker_host_IP_address_or_FQDN>:<port_number>
      
    • tls_mode: Parameters for encrypting the data to transfer, if required, e.g., for compliance with the PCI DSS requirements.

      • disabled: Disabled.
      • enabled: Enabled.
        • ca_certificate: CA certificate.

          Warning

          If no certificate is added, the transfer may fail with an error.

    • subnet_id: ID of the subnet hosting the broker hosts. The transfer will use this subnet to access them.

  • auth: Authentication method used to connect to broker hosts:

    • sasl: SASL authentication.
    • no_auth: Without authentication.

Here is the configuration file example:

resource "yandex_datatransfer_endpoint" "<endpoint_name_in_Terraform>" {
  name = "<endpoint_name>"
  settings {
    kafka_target {
      security_groups = ["<list_of_security_group_IDs>"]
      connection {
        on_premise {
          broker_urls = ["<list_of_IP_addresses_or_broker_host_FQDNs>"]
          subnet_id  = "<ID_of_subnet_with_broker_hosts>"
        }
      }
      auth = {
        <authentication_method>
      }
      <topic_settings>
      <serialization_settings>
    }
  }
}

For more information, see the Terraform provider documentation.

  • securityGroups: Security groups for network traffic, whose rules will apply to VMs and clusters without changing their settings. For more information, see Networking in Yandex Data Transfer.

  • connection.onPremise: Parameters for connecting to the broker hosts:

    • brokerUrls: IP addresses or FQDNs of the broker hosts.

      If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:

      <broker_host_IP_address_or_FQDN>:<port_number>
      
    • tlsMode: Parameters for encrypting the data to transfer, if required, e.g., for compliance with the PCI DSS requirements.

      • disabled: Disabled.
      • enabled: Enabled.
        • caCertificate: CA certificate.

          Warning

          If no certificate is added, the transfer may fail with an error.

    • subnetId: ID of the subnet hosting the broker hosts. The transfer will use this subnet to access them.

  • auth: Authentication method used to connect to broker hosts:

    • sasl: SASL authentication. The following parameters are required:

      • user: Name of the account Data Transfer will use to connect to the topic.
      • password.raw: Password for the account in text form.
      • mechanism: Hashing mechanism.
    • noAuth: Without authentication.

Apache Kafka® topic settingsApache Kafka® topic settings

Management console
Terraform
API
  • Topic:

    • Topic full name: Specify the name of the topic to send messages to. Select Save transactions order, so as not to split an event stream into independent queues by table.

    • Topic prefix: Specify the topic prefix, similar to the Debezium database.server.name setting. Messages will go to the topic named <topic_prefix>.<schema>.<table_name>.

Under topic_settings, specify one of the options to send messages to a topic:

  • topic: Specify parameters in this section to send all messages to one topic:

    • topic_name: Name of the topic to send messages to.
    • save_tx_order: Option for saving the order of transactions. Set to true not to split the event stream into independent queues by table.
  • topic_prefix: Specify a prefix to send messages to different topics with the given prefix.

    It is similar to the Debezium database.server.name setting. Messages will be sent to a topic named <topic_prefix>.<schema>.<table_name>.

In the topicSettings field, specify one of the options to send messages to a topic:

  • topic: Specify parameters in this field to send all messages to one topic:

    • topicName: Name of the topic to send messages to.
    • saveTxOrder: Option for saving the order of transactions. Set to true not to split the event stream into independent queues by table.
  • topicPrefix: Specify a prefix to send messages to different topics with the given prefix.

    It is similar to the Debezium database.server.name setting. Messages will be sent to a topic named <topic_prefix>.<schema>.<table_name>.

Yandex Data Transfer supports CDC for transfers from PostgreSQL, MySQL®, and YDB databases to Apache Kafka® and Yandex Data Streams. Data is sent to the target in Debezium format. For more information about CDC mode, see Change data capture.

Note

In YDB, CDC mode is supported starting from version 22.5.

Serialization settingsSerialization settings

Management console
Terraform
API
  • Under Serializing settings, select the serialization type:

    • Auto: Automatic serialization.

    • Debezium: Serialization under the Debezium standards:

      • Select the message key schema (matches the key.converter Debezium parameter).
      • Select the message value schema (matches the value.converter Debezium parameter).
      • If required, specify Debezium serializer settings in Parameter-Value format.

If you want to use JSON schemas in Yandex Schema Registry and preserve their compatibility when adding and deleting optional fields, use these settings:

  • Serializing settings: Debezium.

  • To use Schema Registry for keys, select Key schema type: JSON (via Schema Registry). To use Schema Registry for values, select Value schema type: JSON (via Schema Registry).

    • URL: Schema Registry namespace endpoint. You can copy the endpoint from the details for the Schema Registry namespace connection on the Debezium tab, in the value.converter.schema.registry.url parameter.

      Warning

      The namespace must have Compatibility check policy for JSON set to optional friendly.

    • Username: api-key.

    • Password: Value of the API key with a limited scope used for connecting to Schema Registry. To get this value:

      1. Create an API key with a limited scope and place it in the SECRET local variable:

        yc iam api-key create --folder-id <folder_ID> \
          --service-account-name <name_of_service_account_for_operations_with_Schema_Registry> \
          --scopes yc.schema-registry.schemas.manage \
          --expires-at '2030-01-01T00:00:00Z' >./api-key.yaml && \
        SECRET=`cat ./api-key.yaml | grep 'secret:' | awk '{print $2}'`
        
      2. Print the SECRET variable value to the terminal:

        echo $SECRET
        
      3. Copy the printed value and paste it into the Password field in the endpoint creation window.

  • Under Debezium serializer settings:

    • To generate a closed schema for keys, include the key.converter.dt.json.generate.closed.content.schema parameter set to true.
    • To generate a closed schema for values, include the value.converter.dt.json.generate.closed.content.schema parameter set to true.

Under serializer, specify the selected serialization type:

  • serializer_auto

  • serializer_json

  • serializer_debezium

    For this type, you can specify the Debezium serialization parameters as key/value pairs in the serializer_debezium.serializer_parameters field.

If you want to use JSON schemas in Yandex Schema Registry and preserve their compatibility when adding and deleting optional fields, add the serializer section with a description of the serialization settings to the configuration file. To generate a closed schema for keys, include the key.converter.dt.json.generate.closed.content.schema variable set to true in the serializer section. To generate a closed schema for values, include the value.converter.dt.json.generate.closed.content.schema variable set to true in the serializer section.

resource "yandex_datatransfer_endpoint" "<endpoint_name_in_Terraform>" {
  ...
  settings {
    kafka_target {
      ...
      serializer {
        serializer_debezium {
          serializer_parameters {            
            key = "key.converter.dt.json.generate.closed.content.schema"
            value = "true"
          }    
          serializer_parameters {            
            key = "value.converter.dt.json.generate.closed.content.schema"
            value = "true"
          }
          serializer_parameters {
            key = "value.converter.schemas.enable"
            value = "true"
          }
          serializer_parameters {
            key = "value.converter.schema.registry.url"
            value = "<Schema_Registry_namespace_URL>"
          }
          serializer_parameters {
            key = "value.converter.basic.auth.user.info"
            value = "api-key:<API_key_value>"
          }
        }
      }
    }
  }
}

Where:

  • Schema_Registry_namespace_URL: Schema Registry namespace endpoint. You can copy the endpoint from the details for the Schema Registry namespace connection on the Debezium tab, in the value.converter.schema.registry.url parameter.

    Warning

    The namespace must have Compatibility check policy for JSON set to optional friendly.

  • API_key_value: Value of the API key with a limited scope used for connecting to Schema Registry. To get this value:

    1. Create an API key with a limited scope and place it in the SECRET local variable:

      yc iam api-key create --folder-id <folder_ID> \
        --service-account-name <name_of_service_account_for_operations_with_Schema_Registry> \
        --scopes yc.schema-registry.schemas.manage \
        --expires-at '2030-01-01T00:00:00Z' >./api-key.yaml && \
      SECRET=`cat ./api-key.yaml | grep 'secret:' | awk '{print $2}'`
      
    2. Print the SECRET variable value to the terminal:

      echo $SECRET
      
    3. Copy the printed value and paste it into the value parameter in the configuration file.

Under serializer, specify the selected serialization type:

  • serializerAuto

  • serializerJson

  • serializerDebezium

    For this type, you can specify the Debezium serialization parameters as key/value pairs in the serializerDebezium.serializerParameters field.

If you want to use JSON schemas in Yandex Schema Registry and preserve their compatibility when adding and deleting optional fields, add the serializer object with a description of the serialization settings to the request body. To generate a closed schema for keys, add the key.converter.dt.json.generate.closed.content.schema variable set to true in serializer. To generate a closed schema for values, add the value.converter.dt.json.generate.closed.content.schema variable set to true in serializer.

"serializer": {
  "serializerDebezium": {
    "serializerParameters": [
      {
        "key": "converter.dt.json.generate.closed.content.schema",
        "value": "true"
      },
      {
        "key": "value.converter",
        "value": "io.confluent.connect.json.JsonSchemaConverter"
      },
      {
        "key": "value.converter.schemas.enable",
        "value": "true"
      },
      {
        "key": "value.converter.schema.registry.url",
        "value": "<Schema_Registry_namespace_URL>"
      },
      {
        "key": "value.converter.basic.auth.user.info",
        "value": "api-key:<API_key_value>"
      }
    ]
  }
}

Where:

  • Schema_Registry_namespace_URL: Schema Registry namespace endpoint. You can copy the endpoint from the details for the Schema Registry namespace connection on the Debezium tab, in the value.converter.schema.registry.url parameter.

    Warning

    The namespace must have Compatibility check policy for JSON set to optional friendly.

  • API_key_value: Value of the API key with a limited scope used for connecting to Schema Registry. To get this value:

    1. Create an API key with a limited scope and place it in the SECRET local variable:

      yc iam api-key create --folder-id <folder_ID> \
        --service-account-name <name_of_service_account_for_operations_with_Schema_Registry> \
        --scopes yc.schema-registry.schemas.manage \
        --expires-at '2030-01-01T00:00:00Z' >./api-key.yaml && \
      SECRET=`cat ./api-key.yaml | grep 'secret:' | awk '{print $2}'`
      
    2. Print the SECRET variable value to the terminal:

      echo $SECRET
      
    3. Copy the printed value and paste it into the value parameter in the configuration file.

Additional settingsAdditional settings

Management console

You can specify topic configuration parameters to use when creating new topics.

Specify the parameter and one of its possible values, e.g., cleanup.policy and compact.

After configuring the data source and target, create and start the transfer.

Was the article helpful?

Previous
Source
Next
Source
Yandex project
© 2025 Yandex.Cloud LLC