Reading data from Data Streams using Query connections
When working with Yandex Data Streams, it is convenient to use connections for prototyping and initial setup of connections to data.
Yandex Data Streams is a service that allows you to transfer data streams to multiple applications for processing, with each of them handling the data independently.
Example of reading JSON
data from Yandex Data Streams
SELECT
JSON_VALUE(CAST(Data AS Json), "$.action") AS action
FROM yds.`input_stream`
WITH (
format=raw,
SCHEMA
(
Data String
)
)
LIMIT 10;
Note
Data from a stream source is transferred as an infinite stream. To stop data processing and output the result to the console, the data in the example is limited with the LIMIT
operator that sets the number of rows in the result.
Setting up a connection
To read data from Yandex Data Streams:
-
Go to Connections in the Yandex Query interface and click Create new.
-
In the window that opens, specify a name for a connection to Yandex Data Streams in the Name field.
-
In the drop-down list under Type, select
Data Streams
. -
In the Cloud and Folder field, enter the data source location.
-
In the Database drop-down list, select the Yandex Managed Service for YDB database where the Yandex Data Streams stream was created.
-
In the Service account field, select the service account to use for data reads or create a new one by granting it the
yds.editor
permissions.To use a service account, the
iam.serviceAccounts.user
role is required. -
Click Create to create a connection.
Data model
Data is sent via Yandex Data Streams in binary form. Data is read using SQL statements.
SELECT
<expression>
FROM
<connection>.<stream_name>
WITH
(
format=raw,
SCHEMA
(
Data String
)
)
WHERE <filter>;
Where:
<connection>
: Name of the Data Streams data stream connection created in the previous step.<stream_name>
: Name of the data stream in Data Streams.
Example of reading data
Sample query for reading data from Yandex Data Streams and writing the results to Yandex Data Streams
$data =
SELECT
JSON_VALUE(Data, "$.host") AS host,
JSON_VALUE(Data, "$.count") AS count,
JSON_VALUE(Data, "$.tag") AS tag,
FROM
(
SELECT
CAST(Data AS Json) AS Data
FROM yds.`input_stream`
WITH
(
format=raw,
SCHEMA
(
Data String
)
)
)
WHERE
JSON_VALUE(Data, "$.tag") = "my_tag";
SELECT
*
FROM
$data
LIMIT 10;
Where:
Field | Type | Description |
---|---|---|
yds |
Yandex Data Streams connection name | |
input_stream |
Source stream name in the SQL query | |
host |
String | Query string parameter |
raw |
String | Data format. Currently, the only supported format is raw |