Debezium Change Data Capture (CDC) stream processing
Debezium
The following architecture is used in this solution:

Setup
To enable a data stream, do the following:
- Create a Yandex Data Streams stream.
- Configure Yandex Data Streams connection settings.
- Set up and launch Debezium Server.
- Set up a Cloud Functions trigger for data processing.
Creating a data stream
Create a Yandex Data Streams stream named debezium. The data stream creation procedure is detailed in the Yandex Data Streams documentation.
Configuring Yandex Data Streams connection settings
- Log in to the management console
. If not signed up yet, navigate to the management console and follow the on-screen instructions. - On the Yandex Cloud Billing
page, make sure you have a linked billing account and its status isACTIVEorTRIAL_ACTIVE. If you do not have a billing account yet, create one. - If you do not have a folder yet, create one.
- Create a service account and assign it the
editorrole for your folder. - Create a static access key.
- Configure the AWS CLI:
-
Install the AWS CLI
and run this command:aws configure -
Enter the following, one by one:
AWS Access Key ID [None]:: Service account key ID.AWS Secret Access Key [None]:: Service account secret key.Default region name [None]::ru-central1availability zone.
-
Setting up Debezium Server
In our example, we will show how Debezium interacts with PostgreSQL. For this setup, Debezium will be installed on the server already running PostgreSQL.
-
Install the Debezium Server using this guide
. -
Go to the
conffolder and create a file namedapplication.propertieswith the following contents:debezium.sink.type=kinesis debezium.sink.kinesis.region=ru-central1 debezium.sink.kinesis.endpoint=<endpoint> debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=localhost debezium.source.database.port=5432 debezium.source.database.user=<user_name> debezium.source.database.password=<user_password> debezium.source.database.dbname=<DB_name> debezium.source.database.server.name=debezium debezium.source.plugin.name=pgoutput debezium.source.transforms=Reroute debezium.source.transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter debezium.source.transforms.Reroute.topic.regex=(.*) debezium.source.transforms.Reroute.topic.replacement=<data_stream>Where:
<endpoint>: Endpoint for a Data Streams data stream, e.g.,https://yds.serverless.yandexcloud.net/ru-central1/b1g89ae43m6he********/etn01eg4rn1********. You can find the endpoint on the stream page (see Viewing a list of streams).<data_stream>: Data Streams data stream name.<DB_name>: PostgreSQL database name.<user_name>: User name for connecting to the PostgreSQL database.<user_password>: User password for connecting to the PostgreSQL database.
-
Run Debezium using this command:
JAVA_OPTS=-Daws.cborEnabled=false ./run.sh -
Make some changes to the PostgreSQL database, e.g., insert data into a table.
-
If the settings are correct, the Debezium console will show messages like:
2022-02-11 07:31:12,850 INFO [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) 1 records sent during previous 00:19:59.999, last recorded offset: {transaction_id=null, lsn_proc=23576408, lsn_commit=23576120, lsn=23576408, txId=580, ts_usec=1644564672582666}
Setting up a Cloud Functions trigger
Create a Cloud Functions trigger for the debezium stream you previously created in Yandex Data Streams.
The trigger creation procedure is detailed in Cloud Functions documentation.
This trigger will be used by Cloud Functions, receiving notifications about any database changes. You can then process these changes by implementing the required business logic in the trigger code.