Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Query
    • Форматы данных и алгоритмы сжатия
      • Чтение данных с помощью соединений
      • Чтение данных с помощью привязок к данным
      • Запись данных
    • Работа с базами данных Managed Service for ClickHouse®
    • Работа с базами данных Managed Service for Greenplum®
    • Работа с базами данных Managed Service for MySQL®
    • Работа с базами данных Managed Service for PostgreSQL
    • Работа с базами данных Managed Service for YDB
    • Запись метрик в Yandex Monitoring
  • Управление доступом
  • Правила тарификации
  • Интеграции
  • Аудитные логи Audit Trails
  • Вопросы и ответы
  • Публичные материалы
  • Обучающие курсы

В этой статье:

  • Настройка соединения
  • Модель данных
  • Пример записи данных
  • Поддерживаемые форматы записи
  1. Источники и приемники данных
  2. Работа с Data Streams
  3. Запись данных

Запись данных в Yandex Data Streams

Статья создана
Yandex Cloud
Улучшена
Max Z.
Обновлена 21 октября 2024 г.
  • Настройка соединения
  • Модель данных
  • Пример записи данных
  • Поддерживаемые форматы записи

Yandex Data Streams это сервис, позволяющий передавать потоки данных сразу нескольким приложениям для обработки, при этом каждое приложение обрабатывает такие данные независимо от другого.

Пример записи данных в формате JSON в Yandex Data Streams.

INSERT INTO yds.`output_stream`
SELECT
    ToBytes(Unwrap(Json::SerializeJson(Yson::From(
    <|"predefined":
            <|
                "host": host,
                "count": count,
            |>,
            "optional":
            <|
                "tag": tag
            |>
        |>)))) 
FROM 
    $data;

Настройка соединенияНастройка соединения

Для чтения данных из Yandex Data Streams необходимо:

  1. Перейти в интерфейс Yandex Query в раздел Соединения и нажать кнопку Создать.
  2. В открывшемся окне в поле Имя указать название соединения с Yandex Data Streams.
  3. В выпадающем поле Тип выбрать Data Streams.
  4. В выпадающем поле База данных выбрать базу данных Yandex Managed Service for YDB, где ранее был создан поток Yandex Data Streams.
  5. В поле Сервисный аккаунт выбрать сервисный аккаунт, который будет использоваться для чтения данных, или создать новый, выдав ему права yds.writer.
  6. Создать соединение, нажав кнопку Создать.

Модель данныхМодель данных

Данные через Yandex Data Streams передаются в бинарном виде. Запись данных выполняется с помощью SQL-выражений и в общем случае выглядит следующим образом:

INSERT INTO <соединение>.<имя_потока>
    <выражение> 
FROM 
   <запрос>

Где:

  • <соединение> — название соединения с потоком данных Data Streams, созданного в предыдущем пункте.
  • <имя_потока> — название потока данных в Data Streams.
  • <запрос> — запрос-источник данных Yandex Query.

Пример записи данныхПример записи данных

Пример запроса для чтения данных из Yandex Data Streams и записи результатов в Yandex Data Streams.

$data = 
SELECT 
    JSON_VALUE(Data, "$.host") AS host,
    CAST(JSON_VALUE(Data, "$.count") AS Int) AS count,
    JSON_VALUE(Data, "$.tag") AS tag,    
FROM 
(
    SELECT
        CAST(Data AS Json) AS Data
    FROM yds.`input_stream`
    WITH(
        format=raw,
        SCHEMA 
        (
            Data String
        )
    )
)
WHERE 
    JSON_VALUE(Data, "$.tag") = "my_tag";

INSERT INTO yds.`output_stream`
SELECT
    ToBytes(Unwrap(Json::SerializeJson(Yson::From(
    <|"predefined":
            <|
                "host": host,
                "count": count,
            |>,
            "optional":
            <|
                "tag": tag
            |>
        |>)))) 
FROM 
    $data;

Где:

Поле Тип Описание
yds Название соединения с Yandex Data Streams
input_stream Название потока-источника данных в SQL-запросе
output_stream Название потока-приемника данных в SQL-запросе
host Строка Строковый параметр запроса
count Целое Численный параметр запроса
raw Строка Формат данных. На данный поддерживается только формат raw - сырые данные

Результаты обработки записываются в выходной поток Yandex Data Streams. Для удобства обработки этих данных, данные преобразовываются в формат JSON, это выполняется с помощью конструкции:

    ToBytes(Unwrap(Json::SerializeJson(Yson::From(
    <|"key": value|>,
    <|"key2": 
        <|"child_key": child_value|>,
    |>,
    ))))

В документации языка YQL доступно детальное описание модулей Yson, Json и его функций, <|"key": value|>.

Поддерживаемые форматы записиПоддерживаемые форматы записи

В Data Streams можно выполнять запись только в виде байтового потока, который интепретируется на принимающей стороне.

Настройки форматов файлов и алгоритмов сжатия при записи в Data Streams не применяются.

Была ли статья полезна?

Предыдущая
Чтение данных с помощью привязок к данным
Следующая
Работа с базами данных Managed Service for ClickHouse®
Проект Яндекса
© 2025 ООО «Яндекс.Облако»