Yandex Cloud
Search
Contact UsGet started
  • Blog
  • Pricing
  • Documentation
  • All Services
  • System Status
    • Featured
    • Infrastructure & Network
    • Data Platform
    • Containers
    • Developer tools
    • Serverless
    • Security
    • Monitoring & Resources
    • ML & AI
    • Business tools
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Customer Stories
    • Gateway to Russia
    • Cloud for Startups
    • Education and Science
  • Blog
  • Pricing
  • Documentation
Yandex project
© 2025 Yandex.Cloud LLC
Yandex Data Transfer
  • Available transfers
  • Getting started
    • All guides
    • Preparing for a transfer
      • Managing endpoints
      • Migrating endpoints to a different availability zone
        • Source
        • Target
    • Managing transfer process
    • Working with databases during transfer
    • Monitoring transfer status
  • Troubleshooting
  • Access management
  • Terraform reference
  • Monitoring metrics
  • Audit Trails events
  • Public materials

In this article:

  • Scenarios for transferring data from Data Streams
  • Preparing the source database
  • Configuring the Data Streams source endpoint
  • Basic settings
  • Advanced settings
  • Configuring the data target
  • Troubleshooting data transfer issues
  • Transfer failure
  • Cloud Functions redirects
  1. Step-by-step guides
  2. Configuring endpoints
  3. Yandex Data Streams
  4. Source

Transferring data from a Yandex Data Streams source endpoint

Written by
Yandex Cloud
Updated at April 9, 2025
  • Scenarios for transferring data from Data Streams
  • Preparing the source database
  • Configuring the Data Streams source endpoint
    • Basic settings
    • Advanced settings
  • Configuring the data target
  • Troubleshooting data transfer issues
    • Transfer failure
    • Cloud Functions redirects

Yandex Data Transfer enables you to migrate data from an Data Streams queue and implement various data transfer, processing, and transformation scenarios. To implement a transfer:

  1. Explore possible data transfer scenarios.
  2. Prepare the Data Streams database for the transfer.
  3. Set up a source endpoint in Yandex Data Transfer.
  4. Set up one of the supported data targets.
  5. Create a transfer and start it.
  6. Perform required operations with the database and control the transfer.
  7. In case of any issues, use ready-made solutions to resolve them.

Scenarios for transferring data from Data StreamsScenarios for transferring data from Data Streams

  1. Migration: Moving data from one storage to another. Migration often means migrating a database from obsolete local databases to managed cloud ones.

    Mirroring data across Data Streams queues is a separate migration task.

  2. Data delivery is a process of delivering arbitrary data to target storage. It includes data retrieval from a queue and its deserialization with subsequent transformation to target storage format.

    • YDS to ClickHouse®.
    • YDS to Object Storage.
    • YDS to Managed Service for YDB.
    • YDS in Managed Service for Apache Kafka®.

For a detailed description of possible Yandex Data Transfer scenarios, see Tutorials.

Preparing the source databasePreparing the source database

  1. Create a service account with the yds.editor role.

  2. Create a data stream.

  3. (Optional) Create a processing function.

    Processing function example
    const yc = require("yandex-cloud");
    const { Parser } = require("@robojones/nginx-log-parser");
    module.exports.handler = async function (event, context) {
        const schema =
            '$remote_addr - $remote_user [$time_local] "$request" $status $bytes_sent "$http_referer" "$http_user_agent"';
        const parser = new Parser(schema);
        return {
            Records: event.Records.map((record) => {
                const decodedData = new Buffer(record.kinesis.data, "base64")
                    .toString("ascii")
                    .trim();
                try {
                    const result = parser.parseLine(decodedData);
                    if (result.request == "") {
                        // empty request - drop message
                        return {
                            eventID: record.eventID,
                            invokeIdentityArn: record.invokeIdentityArn,
                            eventVersion: record.eventVersion,
                            eventName: record.eventName,
                            eventSourceARN: record.eventSourceARN,
                            result: "Dropped"
                        };
                    }
                    return {
                        // successfully parsed message
                        eventID: record.eventID,
                        invokeIdentityArn: record.invokeIdentityArn,
                        eventVersion: record.eventVersion,
                        eventName: record.eventName,
                        eventSourceARN: record.eventSourceARN,
                        kinesis: {
                            data: new Buffer(JSON.stringify(result)).toString(
                                "base64"
                            ),
                        },
                        result: "Ok"
                    };
                } catch (err) {
                    // error - fail message
                    return {
                        eventID: record.eventID,
                        invokeIdentityArn: record.invokeIdentityArn,
                        eventVersion: record.eventVersion,
                        eventName: record.eventName,
                        eventSourceARN: record.eventSourceARN,
                        result: "ProcessingFailed",
                    };
                }
            })
        };
    };
    
  4. (Optional) Prepare a data schema file in JSON format.

    Sample file with a data schema:

    [
        {
            "name": "<field_name>",
            "type": "<type>"
        },
        ...
        {
            "name": "<field_name>",
            "type": "<type>"
        }
    ]
    

    Supported types:

    • any
    • boolean
    • datetime
    • double
    • int8
    • int16
    • int32
    • int64
    • string
    • uint8
    • uint16
    • uint32
    • uint64
    • utf8

Configuring the Data Streams source endpointConfiguring the Data Streams source endpoint

When creating or updating an endpoint, you can define:

  • Stream connection settings in Yandex Data Streams. These are required parameters.
  • Additional settings.

Basic settingsBasic settings

Management console
  • Database: Select a Yandex Managed Service for YDB database registered in Yandex Data Streams as a source.

  • Stream: Specify the name of the data stream associated with the database.

  • Service account: Select or create a service account with the yds.editor role that Data Transfer will use to connect to the data source.

  • Security groups: Select a cloud network you want to place your endpoint in. You can manage cloud networks in Virtual Private Cloud.

Advanced settingsAdvanced settings

Use advanced settings to specify transformation and conversion rules. Data is processed in the following order:

  1. Transformation. Data in JSON format is provided to a Yandex Cloud Functions function. The function body contains metadata and raw data from the queue. The function handles the data and sends it back to Data Transfer.

  2. Conversion. Data is parsed as a preparation for delivery to the target.

If no transformation rules are set, parsing is applied to raw data from the queue. If no conversion rules are set, the data goes directly to the target.

Management console
  • Transformation rules:

    • Cloud function: Select one of the functions created in Cloud Functions.

      • Service account: Select or create a service account you are going to use to start the processing function.
    • Number of retries: Set the number of attempts to invoke the processing function.

    • Buffer size for function: Set the size of the buffer (in bytes) which when full data will be transferred to the processing function.

      The maximum buffer size is 3.5 MB. For more information about restrictions that apply to functions in Cloud Functions, see the relevant section.

    • Flush interval: Set the interval (in seconds) to wait before transferring stream data to the processing function.

      Note

      If the buffer is full or the sending interval expires, data will be transferred to the processing function.

    • Invocation timeout: Set the allowed timeout of the response from the processing function (in seconds).

    Warning

    Values in the Flush interval and Invocation timeout fields are specified with the s postfix, e.g., 10s.

  • Conversion rules:

    • Conversion rules:

      • Data format: Select one of the available formats:
        • JSON: JSON format.

        • AuditTrails.v1 parser: Audit Trails log format.

        • CloudLogging parser: Cloud Logging log format.

        • Debezium CDC parser: Debezium CDC. It allows specifying Confluent Schema Registry in the settings.

          For JSON, specify:

          • Data scheme: Specify the schema as a list of fields or upload a file with a description of the schema in JSON format.
          Sample data schema
          [
              {
                  "name": "request",
                  "type": "string"
                }
          ]
          
          • Enable NULL values in keys: Select this option to allow the null value in key columns.
          • Add a column for missing keys: Select this option to have the fields missing in the schema appear in the _rest column.
          • Unescape string values: Select this option to remove quotation marks from string variables. Otherwise, the string field values will remain unchanged.

          For Debezium CDC, specify the following: Schema Registry URL, authentication method (including the username and user password if authentication is used), and CA certificate.

  • Compresion codecs: Set the required data compression format (GZIP, ZSTD, or Raw).

  • Should continue working, if consumer read lag exceed TTL of topic: Enable this setting to allow the transfer to start or continue reading from a topic where some data was deleted after its retention period had expired. If this setting is off, your transfer will stop with an error in those cases where topic data is deleted after reaching the end of its retention period. This usually happens when a transfer lacks time to migrate all data and the read delay exceeds the retention period.

    Warning

    When you set up your first data delivery from a topic data has been written to for a long time or resume such a delivery after prolonged downtime, you may get a Found rewind error. To resolve this issue, you can temporarily enable this setting, activate the transfer, and disable the setting again after all the data is read and delay is reduced.

Configuring the data targetConfiguring the data target

Configure one of the supported data targets:

  • PostgreSQL
  • MySQL®
  • MongoDB
  • ClickHouse®
  • Greenplum®
  • Yandex Managed Service for YDB
  • Yandex Object Storage
  • Apache Kafka®
  • YDS
  • Elasticsearch
  • OpenSearch

For a complete list of supported sources and targets in Yandex Data Transfer, see Available transfers.

After configuring the data source and target, create and start the transfer.

Troubleshooting data transfer issuesTroubleshooting data transfer issues

  • Transfer failure
  • Cloud Functions redirects

For more troubleshooting tips, see Troubleshooting.

Transfer failureTransfer failure

A Replication or Snapshot and increment transfer is interrupted with an error.

Error message:

/Ydb.PersQueue.V1.PersQueueService/AddReadRule failed: OVERLOADED

Transfers are aborted due to the cloud quota on the number of operations with Managed Service for YDB.

Solution:

  1. Increase the Number of schema transactions per minute property in the Managed Service for YDB quotas for the cloud with the required database and reactivate the transfer.

Cloud Functions redirectsCloud Functions redirects

In rare cases, the following error may occur during transfers from Data Streams or Apache Kafka®:

redirect to SOME_URL is requested but no redirects are allowed.

Possible cause:

The use of the Cloud Functions function is set up on the source. It returns a redirect to another URL rather than data.

Solution:

Such redirects are not allowed for security reasons. Avoid using redirects to Cloud Functions during transfers.

Was the article helpful?

Previous
Source
Next
Target
Yandex project
© 2025 Yandex.Cloud LLC