Передача данных из эндпоинта-источника Yandex Data Streams
С помощью сервиса Yandex Data Transfer вы можете переносить данные из очереди Data Streams и реализовывать различные сценарии переноса, обработки и трансформации данных. Для реализации трансфера:
- Ознакомьтесь с возможными сценариями передачи данных.
- Подготовьте базу данных Data Streams к трансферу.
- Настройте эндпоинт-источник в Yandex Data Transfer.
- Настройте один из поддерживаемых приемников данных.
- Cоздайте и запустите трансфер.
- Выполняйте необходимые действия по работе с базой и контролируйте трансфер.
- При возникновении проблем, воспользуйтесь готовыми решениями по их устранению.
Сценарии передачи данных из Data Streams
-
Миграция — перенос данных из одного хранилища в другое. Часто это перенос базы из устаревших локальных баз в управляемые облачные.
Отдельной задачей миграции является зеркалирование данных между очередями Data Streams.
-
Поставка данных — процесс доставки произвольных данных в целевые хранилища. Процесс поставки включает извлечение данных из очереди и их десериализацию с последующей трансформацией данных в формат целевого хранилища.
Подробное описание возможных сценариев передачи данных в Yandex Data Transfer см. в разделе Практические руководства.
Подготовка базы данных источника
-
Создайте сервисный аккаунт с ролью
yds.editor
. -
(Опционально) Создайте функцию обработки.
Пример функции обработки
const yc = require("yandex-cloud"); const { Parser } = require("@robojones/nginx-log-parser"); module.exports.handler = async function (event, context) { const schema = '$remote_addr - $remote_user [$time_local] "$request" $status $bytes_sent "$http_referer" "$http_user_agent"'; const parser = new Parser(schema); return { Records: event.Records.map((record) => { const decodedData = new Buffer(record.kinesis.data, "base64") .toString("ascii") .trim(); try { const result = parser.parseLine(decodedData); if (result.request == "") { // empty request - drop message return { eventID: record.eventID, invokeIdentityArn: record.invokeIdentityArn, eventVersion: record.eventVersion, eventName: record.eventName, eventSourceARN: record.eventSourceARN, result: "Dropped" }; } return { // successfully parsed message eventID: record.eventID, invokeIdentityArn: record.invokeIdentityArn, eventVersion: record.eventVersion, eventName: record.eventName, eventSourceARN: record.eventSourceARN, kinesis: { data: new Buffer(JSON.stringify(result)).toString( "base64" ), }, result: "Ok" }; } catch (err) { // error - fail message return { eventID: record.eventID, invokeIdentityArn: record.invokeIdentityArn, eventVersion: record.eventVersion, eventName: record.eventName, eventSourceARN: record.eventSourceARN, result: "ProcessingFailed", }; } }) }; };
-
(Опционально) Подготовьте файл схемы данных в формате JSON.
Пример файла со схемой данных:
[ { "name": "<имя_поля>", "type": "<тип>" }, ... { "name": "<имя_поля>", "type": "<тип>" } ]
Список допустимых типов:
any
boolean
datetime
double
int8
int16
int32
int64
string
uint8
uint16
uint32
uint64
utf8
Настройка эндпоинта-источника Data Streams
При создании или изменении эндпоинта вы можете задать:
- Настройки подключения к потоку данных в Yandex Data Streams. Это обязательные параметры.
- Расширенные настройки.
Основные настройки
-
База данных — выберите базу данных Yandex Managed Service for YDB, зарегистрированную в Yandex Data Streams в качестве источника.
-
Поток — укажите имя потока, ассоциированного с базой данных.
-
Сервисный аккаунт — выберите или создайте сервисный аккаунт с ролью
yds.editor
, от имени которого сервис Data Transfer будет подключаться к источнику данных. -
Группы безопасности — выберите облачную сеть, в которой будет размещен эндпоинт. Управлять сетями можно в сервисе Virtual Private Cloud.
Расширенные настройки
В расширенных настройках вы можете задать правила трансформации и правила конвертации. Данные обрабатываются в следующем порядке:
-
Трансформация. Данные в формате JSON передаются функции Yandex Cloud Functions. В теле функции содержатся метаинформация и необработанные данные, которые переданы в очередь. С помощью функции данные обрабатываются и возвращаются в Data Transfer.
-
Конвертация. Выполняется парсинг, с помощью которого данные подготавливаются для передачи приемнику.
Если не заданы правила трансформации, то парсинг применяется к необработанным данным из очереди. Если не заданы правила конвертации, то данные сразу передаются в приемник.
-
Правила трансформации:
-
Функция обработки — выберите одну из функций, созданных в сервисе Cloud Functions.
- Сервисный аккаунт — выберите или создайте сервисный аккаунт, от имени которого будет запускаться функция обработки.
-
Количество попыток — укажите количество попыток вызова функции обработки.
-
Размер буфера для отправки — укажите размер буфера (в байтах), при заполнении которого данные будут переданы функции обработки.
Максимальный размер буфера — 3,5 МБ. Подробнее об ограничениях, действующих при работе с функциями в сервисе Cloud Functions, читайте в соответствующем разделе.
-
Интервал отправки — укажите длительность интервала (в секундах), по истечении которого данные из потока должны быть переданы функции обработки.
Примечание
Если буфер заполнится или истечет интервал отправки, данные будут переданы функции обработки.
-
Таймаут вызова — укажите допустимое время ожидания ответа от функции обработки (в секундах).
Важно
Значения в полях Интервал отправки и Таймаут вызова указываются с постфиксом
s
, например,10s
. -
-
Правила конвертации:
-
Правила конвертации:
- Формат данных — выберите один из доступных форматов:
-
JSON
— формат JSON. -
Парсер AuditTrails.v1
— формат логов сервиса Audit Trails. -
Парсер CloudLogging
— формат логов сервиса Cloud Logging. -
Парсер Debezium CDC
— Debezium CDC. Позволяет в настройках указать Confluent Schema Registry .Для формата JSON укажите:
- Схема данных — задайте схему в виде списка полей или загрузите файл с описанием схемы в формате JSON.
Пример задания схемы данных
[ { "name": "request", "type": "string" } ]
- Использовать значение NULL в ключевых столбцах — выберите эту опцию, чтобы разрешить значение
null
в ключевых колонках. - Добавить неразмеченные столбцы — выберите эту опцию, чтобы поля, отсутствующие в схеме, попадали в колонку
_rest
. - Разэкранировать значения строк — выберите эту опцию, чтобы убрать кавычки из строковых переменных (если этого не сделать, значения строковых полей останутся без изменений).
Для Debezium CDC укажите: URL для Schema Registry, способ аутентификации (с указанием логина и пароля пользователя в случае использования аутентификации) и CA-сертификат.
-
- Формат данных — выберите один из доступных форматов:
-
-
Кодеки сжатия — укажите требуемый формат сжатия данных:
GZIP
,ZSTD
илиБез сжатия
-
Продолжать работу при превышении по TTL топика — выберите опцию, чтобы продолжить работу трансфера при превышении TTL топика, потеряв часть данных. Если опция не выбрана, трансфер остановится с ошибкой, определив потерю данных.
Настройка приемника данных
Настройте один из поддерживаемых приемников данных:
- PostgreSQL;
- MySQL®;
- MongoDB
- ClickHouse®;
- Greenplum®;
- Yandex Managed Service for YDB;
- Yandex Object Storage;
- Apache Kafka®;
- YDS;
- Elasticsearch;
- OpenSearch.
Полный список поддерживаемых источников и приемников в Yandex Data Transfer см. в разделе Доступные трансферы.
После настройки источника и приемника данных создайте и запустите трансфер.
Решение проблем, возникающих при переносе данных
См. полный список рекомендаций в разделе Решение проблем.
Прерывание трансфера с ошибкой
Трансфер типа Репликация или Копирование и репликация прерывается с ошибкой.
Текст ошибки:
/Ydb.PersQueue.V1.PersQueueService/AddReadRule failed: OVERLOADED
Трансфер прерывается из-за ограничения облачной квоты
Решение:
- Увеличьте в квотах Managed Service for YDB на облако с нужной базой данных значение характеристики Количество схемных операций в минуту и активируйте трансфер повторно.
Редиректы Cloud Functions
В трансферах из Data Streams или Apache Kafka® в редких случаях может возникнуть ошибка:
redirect to SOME_URL is requested but no redirects are allowed.
Возможная причина:
На источнике настроено использование функции Cloud Functions, которая возвращает не данные, а редирект на другой URL.
Решение:
По соображениям безопасности такие редиректы запрещены. Воздержитесь от использования редиректов в Cloud Functions в трансферах.