Processing CDC Debezium streams
Debezium
- Send it to Yandex Monitoring to make charts and use it in alerting.
- Write it to a Data Streams stream and then send it to Yandex Cloud Functions for processing.
- Write it to a Data Streams stream and then transfer it to Yandex Data Transfer to be sent to various storage systems.
In this use case, you will send PostgreSQL
To implement this use case:
- Create a Data Streams data stream.
- Set the stream connection credentials.
- Set up Debezium Server.
- Connect Query to your data stream.
- Query the data.
Getting started
Sign up for Yandex Cloud and create a billing account:
- Go to the management console
and log in to Yandex Cloud or create an account if you do not have one yet. - On the Yandex Cloud Billing
page, make sure you have a billing account linked and it has theACTIVE
orTRIAL_ACTIVE
status. If you do not have a billing account, create one.
If you have an active billing account, you can go to the cloud page
Learn more about clouds and folders.
Create a Data Streams data stream
Create a data stream named debezium
.
Set the stream connection parameters
- Create a service account and assign it the
editor
role for your folder. - Create a static access key.
- On the server where PostgreSQL is set up and running, configure the AWS CLI
:-
Install the AWS CLI
and run this command:aws configure
-
Enter the following one by one:
AWS Access Key ID [None]:
: Service account key ID.AWS Secret Access Key [None]:
: Service account secret key.Default region name [None]:
:ru-central1
availability zone.
-
Set up Debezium Server
On the server where PostgreSQL is set up and running:
-
Install the Debezium Server by following this guide
. -
Go to the
conf
folder and create the fileapplication.properties
with the following content:debezium.sink.type=kinesis debezium.sink.kinesis.region=ru-central1 debezium.sink.kinesis.endpoint=<endpoint> debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=localhost debezium.source.database.port=5432 debezium.source.database.user=<username> debezium.source.database.password=<user_password> debezium.source.database.dbname=<DB_name> debezium.source.database.server.name=debezium debezium.source.plugin.name=pgoutput debezium.source.transforms=Reroute debezium.source.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter debezium.source.transforms.Reroute.topic.regex=(.*) debezium.source.transforms.Reroute.topic.replacement=<data_stream>
Where:
<endpoint>
: Endpoint for a Data Streams data stream, e.g.,https://yds.serverless.yandexcloud.net/ru-central1/b1g89ae43m6he********/etn01eg4rn1********
. You can find the endpoint on the stream page (see Viewing a list of streams).<data_stream>
: Data Streams data stream name.<DB_name>
: PostgreSQL database name.<username>
: Username for connecting to the PostgreSQL database.<user_password>
: User password for connecting to the PostgreSQL database.
-
Run Debezium using this command:
JAVA_OPTS=-Daws.cborEnabled=false ./run.sh
-
Make some changes to the PostgreSQL database, such as insert data into a table.
-
If the settings are correct, the Debezium console will show messages like:
2022-02-11 07:31:12,850 INFO [io.deb.con.com.BaseSourceTask](pool-7-thread-1) 1 records sent during previous 00:19:59.999, last recorded offset: {transaction_id=null, lsn_proc=23576408, lsn_commit=23576120, lsn=23576408, txId=580, ts_usec=1644564672582666}
Connect Query to your data stream
- Create a connection named
yds-connection
of theData Streams
type. - On the binding creation page:
- Enter a name for the binding:
debezium
. - Specify the data stream:
cdebezium
. - Add a column named
data
of theJSON
type.
- Enter a name for the binding:
- Click Create.
Query the data
Open the query editor in the Query interface and run the query:
$debezium_data =
SELECT
JSON_VALUE(data,"$.payload.source.table") AS table_name,
DateTime::FromMilliseconds(cast(JSON_VALUE(data,"$.payload.source.ts_ms") AS Uint64)) AS `timestamp`
FROM bindings.`debezium`;
SELECT
table_name,
HOP_END()
FROM
$debezium_data
GROUP BY
HOP(`timestamp`, "PT10S", "PT10S", "PT10S"),
table_name
LIMIT 2;
Note
Data from a stream source is transferred as an infinite stream. To stop data processing and output the result to the console, the data in the example is limited with the LIMIT
operator that sets the number of rows in the result.