Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Data Transfer
  • Доступные трансферы
  • Начало работы
    • Все инструкции
    • Подготовка к трансферу
      • Управление эндпоинтами
      • Миграция эндпоинтов в другую зону доступности
        • Источник
        • Приемник
    • Управление трансфером
    • Работа с базами данных во время трансфера
    • Мониторинг состояния трансфера
  • Решение проблем
  • Управление доступом
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • Обучающие курсы

В этой статье:

  • Сценарии передачи данных из Data Streams
  • Подготовка базы данных источника
  • Настройка эндпоинта-источника Data Streams
  • Основные настройки
  • Расширенные настройки
  • Настройка приемника данных
  • Решение проблем, возникающих при переносе данных
  • Прерывание трансфера с ошибкой
  • Редиректы Cloud Functions
  1. Пошаговые инструкции
  2. Настройка эндпоинтов
  3. Yandex Data Streams
  4. Источник

Передача данных из эндпоинта-источника Yandex Data Streams

Статья создана
Yandex Cloud
Обновлена 10 марта 2025 г.
  • Сценарии передачи данных из Data Streams
  • Подготовка базы данных источника
  • Настройка эндпоинта-источника Data Streams
    • Основные настройки
    • Расширенные настройки
  • Настройка приемника данных
  • Решение проблем, возникающих при переносе данных
    • Прерывание трансфера с ошибкой
    • Редиректы Cloud Functions

С помощью сервиса Yandex Data Transfer вы можете переносить данные из очереди Data Streams и реализовывать различные сценарии переноса, обработки и трансформации данных. Для реализации трансфера:

  1. Ознакомьтесь с возможными сценариями передачи данных.
  2. Подготовьте базу данных Data Streams к трансферу.
  3. Настройте эндпоинт-источник в Yandex Data Transfer.
  4. Настройте один из поддерживаемых приемников данных.
  5. Cоздайте и запустите трансфер.
  6. Выполняйте необходимые действия по работе с базой и контролируйте трансфер.
  7. При возникновении проблем, воспользуйтесь готовыми решениями по их устранению.

Сценарии передачи данных из Data StreamsСценарии передачи данных из Data Streams

  1. Миграция — перенос данных из одного хранилища в другое. Часто это перенос базы из устаревших локальных баз в управляемые облачные.

    Отдельной задачей миграции является зеркалирование данных между очередями Data Streams.

  2. Поставка данных — процесс доставки произвольных данных в целевые хранилища. Процесс поставки включает извлечение данных из очереди и их десериализацию с последующей трансформацией данных в формат целевого хранилища.

    • YDS в ClickHouse®;
    • YDS в Object Storage;
    • YDS в Managed Service for YDB;
    • YDS в Managed Service for Apache Kafka®.

Подробное описание возможных сценариев передачи данных в Yandex Data Transfer см. в разделе Практические руководства.

Подготовка базы данных источникаПодготовка базы данных источника

  1. Создайте сервисный аккаунт с ролью yds.editor.

  2. Создайте поток данных.

  3. (Опционально) Создайте функцию обработки.

    Пример функции обработки
    const yc = require("yandex-cloud");
    const { Parser } = require("@robojones/nginx-log-parser");
    module.exports.handler = async function (event, context) {
        const schema =
            '$remote_addr - $remote_user [$time_local] "$request" $status $bytes_sent "$http_referer" "$http_user_agent"';
        const parser = new Parser(schema);
        return {
            Records: event.Records.map((record) => {
                const decodedData = new Buffer(record.kinesis.data, "base64")
                    .toString("ascii")
                    .trim();
                try {
                    const result = parser.parseLine(decodedData);
                    if (result.request == "") {
                        // empty request - drop message
                        return {
                            eventID: record.eventID,
                            invokeIdentityArn: record.invokeIdentityArn,
                            eventVersion: record.eventVersion,
                            eventName: record.eventName,
                            eventSourceARN: record.eventSourceARN,
                            result: "Dropped"
                        };
                    }
                    return {
                        // successfully parsed message
                        eventID: record.eventID,
                        invokeIdentityArn: record.invokeIdentityArn,
                        eventVersion: record.eventVersion,
                        eventName: record.eventName,
                        eventSourceARN: record.eventSourceARN,
                        kinesis: {
                            data: new Buffer(JSON.stringify(result)).toString(
                                "base64"
                            ),
                        },
                        result: "Ok"
                    };
                } catch (err) {
                    // error - fail message
                    return {
                        eventID: record.eventID,
                        invokeIdentityArn: record.invokeIdentityArn,
                        eventVersion: record.eventVersion,
                        eventName: record.eventName,
                        eventSourceARN: record.eventSourceARN,
                        result: "ProcessingFailed",
                    };
                }
            })
        };
    };
    
  4. (Опционально) Подготовьте файл схемы данных в формате JSON.

    Пример файла со схемой данных:

    [
        {
            "name": "<имя_поля>",
            "type": "<тип>"
        },
        ...
        {
            "name": "<имя_поля>",
            "type": "<тип>"
        }
    ]
    

    Список допустимых типов:

  • any
  • boolean
  • datetime
  • double
  • int8
  • int16
  • int32
  • int64
  • string
  • uint8
  • uint16
  • uint32
  • uint64
  • utf8

Настройка эндпоинта-источника Data StreamsНастройка эндпоинта-источника Data Streams

При создании или изменении эндпоинта вы можете задать:

  • Настройки подключения к потоку данных в Yandex Data Streams. Это обязательные параметры.
  • Расширенные настройки.

Основные настройкиОсновные настройки

Консоль управления
  • База данных — выберите базу данных Yandex Managed Service for YDB, зарегистрированную в Yandex Data Streams в качестве источника.

  • Поток — укажите имя потока, ассоциированного с базой данных.

  • Сервисный аккаунт — выберите или создайте сервисный аккаунт с ролью yds.editor, от имени которого сервис Data Transfer будет подключаться к источнику данных.

  • Группы безопасности — выберите облачную сеть, в которой будет размещен эндпоинт. Управлять сетями можно в сервисе Virtual Private Cloud.

Расширенные настройкиРасширенные настройки

В расширенных настройках вы можете задать правила трансформации и правила конвертации. Данные обрабатываются в следующем порядке:

  1. Трансформация. Данные в формате JSON передаются функции Yandex Cloud Functions. В теле функции содержатся метаинформация и необработанные данные, которые переданы в очередь. С помощью функции данные обрабатываются и возвращаются в Data Transfer.

  2. Конвертация. Выполняется парсинг, с помощью которого данные подготавливаются для передачи приемнику.

Если не заданы правила трансформации, то парсинг применяется к необработанным данным из очереди. Если не заданы правила конвертации, то данные сразу передаются в приемник.

Консоль управления
  • Правила трансформации:

    • Функция обработки — выберите одну из функций, созданных в сервисе Cloud Functions.

      • Сервисный аккаунт — выберите или создайте сервисный аккаунт, от имени которого будет запускаться функция обработки.
    • Количество попыток — укажите количество попыток вызова функции обработки.

    • Размер буфера для отправки — укажите размер буфера (в байтах), при заполнении которого данные будут переданы функции обработки.

      Максимальный размер буфера — 3,5 МБ. Подробнее об ограничениях, действующих при работе с функциями в сервисе Cloud Functions, читайте в соответствующем разделе.

    • Интервал отправки — укажите длительность интервала (в секундах), по истечении которого данные из потока должны быть переданы функции обработки.

      Примечание

      Если буфер заполнится или истечет интервал отправки, данные будут переданы функции обработки.

    • Таймаут вызова — укажите допустимое время ожидания ответа от функции обработки (в секундах).

    Важно

    Значения в полях Интервал отправки и Таймаут вызова указываются с постфиксом s, например, 10s.

  • Правила конвертации:

    • Правила конвертации:

      • Формат данных — выберите один из доступных форматов:
        • JSON — формат JSON.

        • Парсер AuditTrails.v1 — формат логов сервиса Audit Trails.

        • Парсер CloudLogging — формат логов сервиса Cloud Logging.

        • Парсер Debezium CDC — Debezium CDC. Позволяет в настройках указать Confluent Schema Registry.

          Для формата JSON укажите:

          • Схема данных — задайте схему в виде списка полей или загрузите файл с описанием схемы в формате JSON.
          Пример задания схемы данных
          [
              {
                  "name": "request",
                  "type": "string"
                }
          ]
          
          • Использовать значение NULL в ключевых столбцах — выберите эту опцию, чтобы разрешить значение null в ключевых колонках.
          • Добавить неразмеченные столбцы — выберите эту опцию, чтобы поля, отсутствующие в схеме, попадали в колонку _rest.
          • Разэкранировать значения строк — выберите эту опцию, чтобы убрать кавычки из строковых переменных (если этого не сделать, значения строковых полей останутся без изменений).

          Для Debezium CDC укажите: URL для Schema Registry, способ аутентификации (с указанием логина и пароля пользователя в случае использования аутентификации) и CA-сертификат.

  • Кодеки сжатия — укажите требуемый формат сжатия данных: GZIP, ZSTD или Без сжатия.

  • Продолжать работу при превышении по TTL топика — включите настройку, чтобы разрешить трансферу начать или продолжить чтение из топика, даже если часть данных в нем была удалена в связи с превышением периода хранения. Если настройка выключена, при превышении периода хранения и удалении данных из топика трансфер остановится с ошибкой. Обычно это происходит в случае, когда трансфер не успевает переносить все данные и отставание чтения превышает период хранения.

    Важно

    Если вы впервые настраиваете поставку данных из топика, в который данные уже давно пишутся, либо включаете ее после длительного простоя и сталкиваетесь с ошибкой вида Found rewind, вы можете временно включить эту настройку, активировать трансфер, затем, после считывания всех данных и сокращения отставания, выключить настройку.

Настройка приемника данныхНастройка приемника данных

Настройте один из поддерживаемых приемников данных:

  • PostgreSQL;
  • MySQL®;
  • MongoDB
  • ClickHouse®;
  • Greenplum®;
  • Yandex Managed Service for YDB;
  • Yandex Object Storage;
  • Apache Kafka®;
  • YDS;
  • Elasticsearch;
  • OpenSearch.

Полный список поддерживаемых источников и приемников в Yandex Data Transfer см. в разделе Доступные трансферы.

После настройки источника и приемника данных создайте и запустите трансфер.

Решение проблем, возникающих при переносе данныхРешение проблем, возникающих при переносе данных

  • Прерывание трансфера с ошибкой
  • Редиректы Cloud Functions

См. полный список рекомендаций в разделе Решение проблем.

Прерывание трансфера с ошибкойПрерывание трансфера с ошибкой

Трансфер типа Репликация или Копирование и репликация прерывается с ошибкой.

Текст ошибки:

/Ydb.PersQueue.V1.PersQueueService/AddReadRule failed: OVERLOADED

Трансфер прерывается из-за ограничения облачной квоты на количество операций с Managed Service for YDB.

Решение:

  1. Увеличьте в квотах Managed Service for YDB на облако с нужной базой данных значение характеристики Количество схемных операций в минуту и активируйте трансфер повторно.

Редиректы Cloud FunctionsРедиректы Cloud Functions

В трансферах из Data Streams или Apache Kafka® в редких случаях может возникнуть ошибка:

redirect to SOME_URL is requested but no redirects are allowed.

Возможная причина:

На источнике настроено использование функции Cloud Functions, которая возвращает не данные, а редирект на другой URL.

Решение:

По соображениям безопасности такие редиректы запрещены. Воздержитесь от использования редиректов в Cloud Functions в трансферах.

Была ли статья полезна?

Предыдущая
Источник
Следующая
Приемник
Проект Яндекса
© 2025 ООО «Яндекс.Облако»