Processing CDC Debezium streams
Debezium
Below is the architecture of the solution:
Setup
To get a data stream:
- Create a Yandex Data Streams stream.
- Set Yandex Data Streams connection parameters.
- Set up and start the Debezium Server.
- Set up a trigger in Cloud Functions for processing data.
Creating a stream
Create a Yandex Data Streams stream named debezium
. For detailed information about creating streams, see the Yandex Data Streams documentation.
Setting Yandex Data Streams connection parameters
- Log in to the management console
. If you do not have an account yet, go to the management console and follow the guide. - On the Billing
page, make sure you have a billing account linked and it has theACTIVE
orTRIAL_ACTIVE
status. If you do not yet have a billing account, create one. - If you do not have any folder, create one.
- Create a service account and assign it the
editor
role for your folder. - Create a static access key.
- Set up the AWS CLI:
-
Install the AWS CLI
and run the following command:aws configure
-
Enter the following one by one:
AWS Access Key ID [None]:
: Service account key ID.AWS Secret Access Key [None]:
: Secret access key of the service account.Default region name [None]:
:ru-central1
availability zone.
-
Setting up Debezium Server
This use case shows how Debezium works with PostgreSQL. Let's assume that Debezium is installed on the server where PostgreSQL is running.
-
Install the Debezium Server by following this guide
. -
Go to the
conf
folder and create the fileapplication.properties
with the following content:debezium.sink.type=kinesis debezium.sink.kinesis.region=ru-central1 debezium.sink.kinesis.endpoint=<endpoint> debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=localhost debezium.source.database.port=5432 debezium.source.database.user=<username> debezium.source.database.password=<user_password> debezium.source.database.dbname=<DB_name> debezium.source.database.server.name=debezium debezium.source.plugin.name=pgoutput debezium.source.transforms=Reroute debezium.source.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter debezium.source.transforms.Reroute.topic.regex=(.*) debezium.source.transforms.Reroute.topic.replacement=<data_stream>
Where:
<endpoint>
: Endpoint for a Data Streams data stream, e.g.,https://yds.serverless.yandexcloud.net/ru-central1/b1g89ae43m6he********/etn01eg4rn1********
. You can find the endpoint on the stream page (see Viewing a list of streams).<data_stream>
: Data Streams data stream name.<DB_name>
: PostgreSQL database name.<username>
: Username for connecting to the PostgreSQL database.<user_password>
: User password for connecting to the PostgreSQL database.
-
Run Debezium using this command:
JAVA_OPTS=-Daws.cborEnabled=false ./run.sh
-
Make some changes to the PostgreSQL database, such as insert data into a table.
-
If the settings are correct, the Debezium console will show messages like:
2022-02-11 07:31:12,850 INFO [io.deb.con.com.BaseSourceTask](pool-7-thread-1) 1 records sent during previous 00:19:59.999, last recorded offset: {transaction_id=null, lsn_proc=23576408, lsn_commit=23576120, lsn=23576408, txId=580, ts_usec=1644564672582666}
Setting up a trigger in Cloud Functions
Create a trigger in Cloud Functions to the Yandex Data Streams debezium
stream you created earlier.
For detailed information about creating triggers, see the Cloud Functions documentation.
Cloud Functions will send notifications about any DB changes to the trigger. In the trigger code, you can handle these changes by implementing any required software processing.