Unified streaming and batch data analysis
In this example, we will query analytical and streaming data to calculate taxi fares in specific locations.
We will use the same SQL query for both data types, with the only difference in bucket and stream connections and data bindings.
The data for batch processing has been pre-loaded in the Yandex Object Storage bucket in Parquet
In both cases, we will use a reference table stored in Object Storage to filter query data.
To run this example:
- Make the necessary preparations.
- Analyze the Object Storage data.
- Analyze the streaming data from Data Streams.
Note
Yandex Cloud provides the New York City taxi trips dataset as is. Yandex Cloud makes no express or implied representations, warranties, or conditions pertaining to your use of the specified dataset. To the extent permitted by your local law, Yandex Cloud shall not be liable for any loss or damage, including direct, indirect, consequential, special, incidental, or punitive, resulting from your use of the dataset.
NYC Taxi and Limousine Commission (TLC):
The data was collected and provided to the NYC Taxi and Limousine Commission (TLC) by technology providers authorized under the Taxicab & Livery Passenger Enhancement Programs (TPEP/LPEP). The taxi trip data was not created by the TLC, and the TLC makes no representations whatsoever about the accuracy of this data.
Please review the dataset’s original source
Make the necessary preparations
- Log in to the management console
or sign up if you have not already. If you have not signed up yet, navigate to the management console and follow the instructions. - On the Yandex Cloud Billing
page, make sure you have anACTIVEorTRIAL_ACTIVEbilling account. If you do not have a billing account yet, create one. - If you do not have a folder yet, create one.
- We will connect to the data stream using a service account. Create a service account named
datastream-connection-accountand assign it theydb.editorrole. - Data streams use Yandex Managed Service for YDB. You will need to create a serverless database.
Analyze the data Object Storage
Connect to the analytical data source
-
In the management console
, select the folder where you want to create a connection. -
Navigate to Yandex Query.
-
In the left-hand panel, select
Tutorial. -
Under Create infrastructure for tutorial, click Create connection.
This will open the create connection page. Check the default settings; do not change them.
-
Click Create.
This will open the create data binding page. Check the default settings; do not change them.
-
Click Create.
Run the query
-
In the query editor within the Query interface, click New analytics query.
-
Enter the query text in the text field:
$data = SELECT * FROM `tutorial-analytics`; $locations = SELECT PULocationID FROM `tutorial-analytics`.`nyc_taxi_sample/example_locations.csv` WITH ( format=csv_with_names, SCHEMA ( PULocationID String ) ); $time = SELECT HOP_END() AS time, rides.PULocationID AS PULocationID, SUM(total_amount) AS total_amount FROM $data AS rides INNER JOIN $locations AS locations ON rides.PULocationID=locations.PULocationID GROUP BY HOP(CAST(tpep_pickup_datetime AS Timestamp?), "PT1M", "PT1M", "PT1M"), rides.PULocationID; SELECT * FROM $time; -
Click Run.
Check the result
Once executed, the analytical query will return the distribution of taxi trip fares in specific locations.
| # | time | PULocationID | total_amount |
|---|---|---|---|
| 1 | 2017-12-31T22:24:00.000000Z | 120 | 7.54 |
| 2 | 2018-01-01T00:13:00.000000Z | 120 | 48.8 |
| 3 | 2018-01-01T03:25:00.000000Z | 120 | 30.8 |
| 4 | 2018-01-01T11:29:00.000000Z | 120 | 32.88 |
| 5 | 2018-01-01T15:13:00.000000Z | 120 | 9.8 |
| 6 | 2018-01-01T22:03:00.000000Z | 120 | 14.8 |
| 7 | 2018-01-02T19:28:00.000000Z | 120 | 7.3 |
| 8 | 2018-01-03T10:17:00.000000Z | 120 | 81.3 |
Analyze the streaming data from Data Streams
Create a data stream
- In the management console
, select the folder where you need to create a data stream. - Navigate to Data Streams.
- Click Create stream.
- Specify the Yandex Managed Service for YDB database created earlier.
- Name the data stream:
yellow-taxi. - Click Create.
Set up data generation
-
Create a connection:
- In the management console
, select the folder where you want to create a connection. - Navigate to Yandex Query.
- In the left-hand panel, select
Tutorial. - Navigate to Streaming.
- Under Create infrastructure for tutorial, click Create connection.
- In the window that opens, under Connection type parameters, select the database and service account you created earlier.
- Click Create.
- In the management console
-
Create a data binding:
- This will open the
create data bindingpage. - Under Binding parameters, select the
yellow-taxistream you created earlier. - Click Create.
- This will open the
The generator will start writing data to the yellow-taxi stream. You can control the generator using the Stop and Start buttons.
Run the query
-
In the query editor within the Query interface, click New streaming query.
-
Enter the query text in the text field:
$data = SELECT * FROM bindings.`tutorial-streaming`; $locations = SELECT PULocationID FROM `tutorial-analytics`.`nyc_taxi_sample/example_locations.csv` WITH ( format=csv_with_names, SCHEMA ( PULocationID String ) ); $time = SELECT HOP_END() AS time, rides.PULocationID AS PULocationID, SUM(total_amount) AS total_amount FROM $data AS rides INNER JOIN $locations AS locations ON rides.PULocationID=locations.PULocationID GROUP BY HOP(cast(tpep_pickup_datetime AS Timestamp?), "PT1M", "PT1M", "PT1M"), rides.PULocationID; SELECT * FROM $time; -
Click Run.
Check the result
Once launched, the query returns the total fare (total_amount) of the taxi rides taken in specific locations (PULocationID) after processing started.
| # | PULocationID | time | total_amount |
|---|---|---|---|
| 1 | 125 | 2022-02-15T12:03:00.000000Z | 1275.4084 |
| 2 | 129 | 2022-02-15T12:03:00.000000Z | 1073.0449 |
| 3 | 126 | 2022-02-15T12:03:00.000000Z | 202.85883 |
| 4 | 121 | 2022-02-15T12:03:00.000000Z | 636.8784 |
| 5 | 124 | 2022-02-15T12:03:00.000000Z | 923.87805 |
| 6 | 127 | 2022-02-15T12:04:00.000000Z | 2105.3125 |
| ... |