Writing data to Yandex Data Streams
Yandex Data Streams is a service that allows you to transfer data streams to multiple applications for processing, with each of them handling the data independently.
Example of writing JSON
data to Yandex Data Streams
INSERT INTO yds.`output_stream`
SELECT
ToBytes(Unwrap(Json::SerializeJson(Yson::From(
<|"predefined":
<|
"host": host,
"count": count,
|>,
"optional":
<|
"tag": tag
|>
|>))))
FROM
$data;
Setting up a connection
To read data from Yandex Data Streams:
- Go to Connections in the Yandex Query interface and click Create new.
- In the window that opens, specify a name for a connection to Yandex Data Streams in the Name field.
- In the drop-down list under Type, select
Data Streams
. - In the Database drop-down list, select the Yandex Managed Service for YDB database where the Yandex Data Streams stream was created.
- In the Service account field, select a service account for data reads or create a new one with the
yds.writer
permissions. - Click Create to create a connection.
Data model
Data is sent via Yandex Data Streams in binary form. Data is written using SQL statements and generally looks like this:
INSERT INTO <connection>.<stream_name>
<expression>
FROM
<query>
Where:
<connection>
: Name of the Data Streams data stream connection created in the previous step.<stream_name>
: Name of the data stream in Data Streams.<query>
: Yandex Query data source query.
Example of writing data
Sample query for reading data from Yandex Data Streams and writing the results to Yandex Data Streams
$data =
SELECT
JSON_VALUE(Data, "$.host") AS host,
CAST(JSON_VALUE(Data, "$.count") AS Int) AS count,
JSON_VALUE(Data, "$.tag") AS tag,
FROM
(
SELECT
CAST(Data AS Json) AS Data
FROM yds.`input_stream`
WITH(
format=raw,
SCHEMA
(
Data String
)
)
)
WHERE
JSON_VALUE(Data, "$.tag") = "my_tag";
INSERT INTO yds.`output_stream`
SELECT
ToBytes(Unwrap(Json::SerializeJson(Yson::From(
<|"predefined":
<|
"host": host,
"count": count,
|>,
"optional":
<|
"tag": tag
|>
|>))))
FROM
$data;
Where:
Field | Type | Description |
---|---|---|
yds |
Yandex Data Streams connection name | |
input_stream |
Source stream name in the SQL query | |
output_stream |
Target stream name in the SQL query | |
host |
String | Query string parameter |
count |
Integer | Query numerical parameter |
raw |
String | Data format. Currently, the only supported format is raw (raw data) |
The processing results are written to the Yandex Data Streams output stream. To facilitate the processing, this data is converted to JSON
format using the following statement:
ToBytes(Unwrap(Json::SerializeJson(Yson::From(
<|"key": value|>,
<|"key2":
<|"child_key": child_value|>,
|>,
))))
The YQL documentation provides a detailed description of Yson
Supported write formats
Data Streams only lets you write data as a byte stream that is interpreted on the receiving side.
File format and compression algorithm settings for data writes in Data Streams are not applied.