Delivering data to Yandex Managed Service for Apache Kafka® using Debezium
You can track data changes in Managed Service for PostgreSQL and send them to Managed Service for Apache Kafka® using Change Data Capture (CDC).
In this article, you will learn how to create a virtual machine in Yandex Cloud and set up Debezium
Getting started
-
Create a source cluster with the following settings:
- Hosts: Publicly available
- Database:
db1
- User:
user1
-
Create a Managed Service for Apache Kafka® target cluster in any applicable configuration with publicly available hosts.
-
Create a virtual machine with Ubuntu 20.04 and a public IP address.
-
If you are using security groups, configure them to enable connecting to the clusters both from the internet and from the created VM. In addition, enable connecting to this VM over SSH from the internet:
-
Connect to a virtual machine over SSH and perform preliminary setup:
-
Install the dependencies:
sudo apt update && \ sudo apt install kafkacat openjdk-17-jre postgresql-client --yes
-
Create a folder for Apache Kafka®:
sudo mkdir -p /opt/kafka/
-
Download and unpack the archive with Apache Kafka® executable files in this folder. For example, to download and unpack Apache Kafka® 3.0, run the command:
wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz && \ sudo tar xf kafka_2.13-3.0.0.tgz --strip 1 --directory /opt/kafka/
You can check the current Apache Kafka® version on the page with project downloads
. -
Install certificates on the VM and check the availability of clusters:
- Managed Service for Apache Kafka® (use
kafkacat
). - Managed Service for PostgreSQL (use
psql
).
- Managed Service for Apache Kafka® (use
-
Create a folder that will store the files required for the operation of the Debezium connector:
sudo mkdir -p /etc/debezium/plugins/
-
The Debezium connector can connect to Managed Service for Apache Kafka® broker hosts if an SSL certificate is added to Java secure storage (Java Key Store). For added storage security, add a password that is at least 6 characters long to the
-storepass
parameter:sudo keytool \ -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore /etc/debezium/keystore.jks \ -storepass <JKS_password> \ --noprompt
-
Preparing the source cluster
-
Assign to the user
user1
the rolemdb_replication
.This is necessary to create a publication for Debezium to monitor changes in a Managed Service for PostgreSQL cluster.
-
Connect to the
db1
database on behalf ofuser1
. -
Add test data to the database. In this example, a simple table with information from car sensors is used.
Create a table:
CREATE TABLE public.measurements ( "device_id" text PRIMARY KEY NOT NULL, "datetime" timestamp NOT NULL, "latitude" real NOT NULL, "longitude" real NOT NULL, "altitude" real NOT NULL, "speed" real NOT NULL, "battery_voltage" real, "cabin_temperature" real NOT NULL, "fuel_level" real );
Populate the table with data:
INSERT INTO public.measurements VALUES ('iv9a94th6rzt********', '2020-06-05 17:27:00', 55.70329032, 37.65472196, 427.5, 0, 23.5, 17, NULL), ('rhibbh3y08qm********', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32), ('iv9a94th678t********', '2020-06-07 15:00:10', 55.70985913, 37.62141918, 417.0, 15.7, 10.3, 17, NULL);
-
Create a publication for the table:
CREATE PUBLICATION mpg_publication FOR TABLE public.measurements;
Configure Debezium
-
Connect to the virtual machine over SSH.
-
Download and unpack a proper Debezium connector
to the folder/etc/debezium/plugins/
.You can check the current connector version on the project page
. The commands for version1.9.4.Final
are below.VERSION="1.9.4.Final" wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/${VERSION}/debezium-connector-postgres-${VERSION}-plugin.tar.gz && \ sudo tar -xzvf debezium-connector-postgres-${VERSION}-plugin.tar.gz -C /etc/debezium/plugins/
-
Create the file
/etc/debezium/mdb-connector.conf
with Debezium connector settings for connecting to the source cluster:name=debezium-mpg connector.class=io.debezium.connector.postgresql.PostgresConnector plugin.name=pgoutput database.hostname=c-<cluster_ID>.rw.mdb.yandexcloud.net database.port=6432 database.user=user1 database.password=<user1_password> database.dbname=db1 database.server.name=mpg table.include.list=public.measurements publication.name=mpg_publication slot.name=debezium_slot heartbeat.interval.ms=15000 heartbeat.topics.prefix=debezium-heartbeat snapshot.mode=always
Where:
-
name
: Logical name of the Debezium connector. Used for the connector's internal needs. -
database.hostname
: Special FQDN to connect to the source cluster master host.You can get the cluster ID with a list of clusters in the folder.
-
database.user
: PostgreSQL username. -
database.dbname
: PostgreSQL database name. -
database.server.name
: Name of the database server that Debezium will use when choosing a topic for sending messages. -
table.include.list
: Names of tables for which Debezium should track changes. Specify full names that include the schema name (default:public
). Debezium will use values from this field when selecting a topic for sending messages. -
publication.name
: Name of the publication created on the source cluster. -
slot.name
: Name of the replication slot that will be created by Debezium when working with the publication. -
heartbeat.interval.ms
andheartbeat.topics.prefix
: Heartbeat settings required for Debezium. -
snapshot.mode
: Type of the snapshot created at connector startup. For the connector to run properly, set this parameter toalways
.
-
Prepare the target cluster
-
Create a topic to store data from the source cluster:
-
Name:
mpg.public.measurements
.Data topic names follow
the<server_name>.<schema_name>.<table_name>
convention.According to the Debezium configuration file:
- The name of the
mpg
server is specified in thedatabase.server.name parameter
. - The name of the
public
schema is specified together with the name of themeasurements
table in thetable.include.list
parameter.
- The name of the
If you need to track data changes in multiple tables, create a separate topic for each one of them.
-
-
Create a service topic to track the connector status:
-
Name:
debezium-heartbeat.mpg
Names for service topics follow
the<prefix_for_heartbeat>.<server_name>
convention.According to the Debezium configuration file:
- The
debezium-heartbeat
prefix is specified in theheartbeat.topics.prefix
parameter. - The name of the
mpg
server is specified in thedatabase.server.name parameter
.
- The
-
Cleanup policy:
Compact
.
If you need data from multiple source clusters, create a separate service topic for each of them.
-
-
Create a user named
debezium
. -
Grant to the
debezium
user the rightsACCESS_ROLE_CONSUMER
andACCESS_ROLE_PRODUCER
to the created topics.
Start Debezium
-
Create a file with Debezium worker settings:
/etc/debezium/worker.conf
# AdminAPI connect properties bootstrap.servers=<FQDN_of_broker_host_1>:9091,...,<FQDN_of_broker_host_N>:9091 sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_SSL ssl.truststore.location=/etc/debezium/keystore.jks ssl.truststore.password=<JKS_password> sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<debezium_user_password>"; # Producer connect properties producer.sasl.mechanism=SCRAM-SHA-512 producer.security.protocol=SASL_SSL producer.ssl.truststore.location=/etc/debezium/keystore.jks producer.ssl.truststore.password=<JKS_password> producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<debezium_user_password>"; # Worker properties plugin.path=/etc/debezium/plugins/ key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/etc/debezium/worker.offset
-
In a separate terminal, start the connector:
sudo /opt/kafka/bin/connect-standalone.sh \ /etc/debezium/worker.conf \ /etc/debezium/mdb-connector.conf
Check the health of Debezium
-
In a separate terminal, run the
kafkacat
utility in consumer mode:kafkacat \ -C \ -b <FQDN_of_broker_host_1>:9091,...,<FQDN_of_broker_host_N>:9091 \ -t mpg.public.measurements \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=debezium \ -X sasl.password=<password> \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -Z \ -K:
The data format schema of the
db1.public.measurements
table and the information about the previously added rows will be printed.Example of the message fragment
{ "schema": { ... }, "payload": { "before": null, "after": { "device_id": "iv9a94th6rzt********", "datetime": 1591378020000000, "latitude": 55.70329, "longitude": 37.65472, "altitude": 427.5, "speed": 0.0, "battery_voltage": 23.5, "cabin_temperature": 17.0, "fuel_level": null }, "source": { "version": "1.8.1.Final", "connector": "postgresql", "name": "mpg", "ts_ms": 1628245046882, "snapshot": "true", "db": "db1", "sequence": "[null,\"4328525512\"]", "schema": "public", "table": "measurements", "txId": 8861, "lsn": 4328525328, "xmin": null }, "op": "r", "ts_ms": 1628245046893, "transaction": null } }
-
Connect to the source cluster.
When connecting, the
ERROR Postgres roles LOGIN and REPLICATION are not assigned to user
error may occur. You can ignore it, as it does not affect Debezium performance. -
Add another row to the
measurements
table:INSERT INTO public.measurements VALUES ('iv7b74th678t********', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
-
Make sure the terminal running
kafkacat
displays details about the added row.
Delete the resources you created
Delete the resources you no longer need to avoid paying for them:
-
Delete the virtual machine.
If you reserved a public static IP address for the virtual machine, release and delete it.
-
Delete the clusters: