Reading data from Data Streams via Query connections
Using Yandex Data Streams connections is convenient for prototyping and initial data access configuration.
Yandex Data Streams enables you to transfer data streams to multiple applications for processing, with each handling the data independently of the others.
Query example for reading Json-formatted data from Yandex Data Streams:
SELECT
JSON_VALUE(CAST(Data AS Json), "$.action") AS action
FROM yds.`input_stream`
WITH (
format=raw,
SCHEMA
(
Data String
)
)
LIMIT 10;
Note
Data from a streaming source is delivered as an infinite stream. To prevent infinite streaming and get output in the console, the example uses the LIMIT clause that limits the number of result rows.
Setting up a connection
To read data from Yandex Data Streams:
-
Navigate to the Connections section of the Yandex Query interface and click Create new.
-
In the window that opens, specify the Yandex Data Streams connection name in the Name field.
-
In the Type dropdown, select
Data Streams. -
In the Cloud and Folder field, specify the data source location.
-
In the Database dropdown, select the Yandex Managed Service for YDB database where you created the Yandex Data Streams stream.
-
In the Service account field, select an existing service account or create a new one. Assign it the
yds.editorpermissions required to read data.To use a service account, the
iam.serviceAccounts.userrole is required. -
Click Create to create a connection.
Data model
Data is transmitted via Yandex Data Streams in binary format and is read via SQL statements.
SELECT
<expression>
FROM
<connection>.<stream_name>
WITH
(
format=raw,
SCHEMA
(
Data String
)
)
WHERE <filter>;
Where:
<connection>: Name of the Data Streams data stream connection created in the previous step.<stream_name>: Data Streams data stream name.
Data reading example
Query example for reading data from Yandex Data Streams and writing the results to Yandex Data Streams
$data =
SELECT
JSON_VALUE(Data, "$.host") AS host,
JSON_VALUE(Data, "$.count") AS count,
JSON_VALUE(Data, "$.tag") AS tag,
FROM
(
SELECT
CAST(Data AS Json) AS Data
FROM yds.`input_stream`
WITH
(
format=raw,
SCHEMA
(
Data String
)
)
)
WHERE
JSON_VALUE(Data, "$.tag") = "my_tag";
SELECT
*
FROM
$data
LIMIT 10;
Where:
| Field | Type | Description |
|---|---|---|
yds |
Yandex Data Streams connection name | |
input_stream |
Name of the source data stream in the SQL query | |
host |
String | Query string parameter |
raw |
String | Data format. Support is currently limited to the raw format |