Delivering data to Yandex Managed Service for Apache Kafka® using Debezium
You can track data changes in Managed Service for PostgreSQL and send them to Managed Service for Apache Kafka® using Change Data Capture (CDC).
In this article, you will learn how to create a virtual machine in Yandex Cloud and set up Debezium
Required paid resources
The support cost includes:
- Managed Service for PostgreSQL cluster fee: Using computing resources allocated to hosts and disk space (see Managed Service for PostgreSQL pricing).
- Managed Service for Apache Kafka® cluster fee: Using computing resources allocated to hosts (including ZooKeeper hosts) and disk space (see Apache Kafka® pricing).
- VM fee: Using computing resources, operating system, and storage (see Compute Cloud pricing).
- Fee for using public IP addresses for the VM and hosts of the two clusters (see Virtual Private Cloud pricing).
Getting started
-
Create a source cluster with the following settings:
- Hosts: Publicly available
- Database:
db1
- User:
user1
-
Create a Managed Service for Apache Kafka® target cluster in any suitable configuration with publicly available hosts.
-
Create a virtual machine with Ubuntu 20.04 and a public IP address.
-
If you are using security groups, configure them to enable connecting to the clusters both from the internet and from the created VM. In addition, enable connecting to this VM over SSH from the internet:
-
Connect to a virtual machine over SSH and perform preliminary setup:
-
Install the dependencies:
sudo apt update && \ sudo apt install kafkacat openjdk-17-jre postgresql-client --yes
Check that you can use it to connect to the Managed Service for Apache Kafka® source cluster over SSL.
-
Create a folder for Apache Kafka®:
sudo mkdir -p /opt/kafka/
-
Download and unpack the archive with Apache Kafka® executable files in this folder. For example, to download and unpack Apache Kafka® 3.0, run the command:
wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz && \ sudo tar xf kafka_2.13-3.0.0.tgz --strip 1 --directory /opt/kafka/
You can check the current Apache Kafka® version on the page with project downloads
. -
Install certificates on the VM and check the availability of clusters:
- Managed Service for Apache Kafka® (use
kafkacat
). - Managed Service for PostgreSQL (use
psql
).
-
Create a folder that will store the files required for the operation of the Debezium connector:
sudo mkdir -p /etc/debezium/plugins/
-
The Debezium connector can connect to Managed Service for Apache Kafka® broker hosts if an SSL certificate is added to Java secure storage (Java Key Store). For added storage security, add a password, at least 6 characters long, to the
-storepass
parameter:sudo keytool \ -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore /etc/debezium/keystore.jks \ -storepass <JKS_password> \ --noprompt
-
Preparing the source cluster
-
Assign the
mdb_replication
role touser1
.This is necessary to create a publication for Debezium to monitor changes in a Managed Service for PostgreSQL cluster.
-
Connect to the
db1
database underuser1
. -
Add test data to the database. In this example, a simple table with information from car sensors is used.
Create a table:
CREATE TABLE public.measurements ( "device_id" text PRIMARY KEY NOT NULL, "datetime" timestamp NOT NULL, "latitude" real NOT NULL, "longitude" real NOT NULL, "altitude" real NOT NULL, "speed" real NOT NULL, "battery_voltage" real, "cabin_temperature" real NOT NULL, "fuel_level" real );
Populate the table with data:
INSERT INTO public.measurements VALUES ('iv9a94th6rzt********', '2020-06-05 17:27:00', 55.70329032, 37.65472196, 427.5, 0, 23.5, 17, NULL), ('rhibbh3y08qm********', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32), ('iv9a94th678t********', '2020-06-07 15:00:10', 55.70985913, 37.62141918, 417.0, 15.7, 10.3, 17, NULL);
-
Create a publication for the table:
CREATE PUBLICATION mpg_publication FOR TABLE public.measurements;
Configure Debezium
-
Connect to the virtual machine over SSH.
-
Download an up-to-date Debezium connector
and unpack it to the/etc/debezium/plugins/
directory.You can check the current connector version on the project page
. The commands for version1.9.4.Final
are below.VERSION="1.9.4.Final" wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/${VERSION}/debezium-connector-postgres-${VERSION}-plugin.tar.gz && \ sudo tar -xzvf debezium-connector-postgres-${VERSION}-plugin.tar.gz -C /etc/debezium/plugins/
-
Create a file named
/etc/debezium/mdb-connector.conf
with the Debezium connector settings for connecting to the source cluster:name=debezium-mpg connector.class=io.debezium.connector.postgresql.PostgresConnector plugin.name=pgoutput database.hostname=c-<cluster_ID>.rw.mdb.yandexcloud.net database.port=6432 database.user=user1 database.password=<user1_password> database.dbname=db1 database.server.name=mpg table.include.list=public.measurements publication.name=mpg_publication slot.name=debezium_slot heartbeat.interval.ms=15000 heartbeat.topics.prefix=debezium-heartbeat snapshot.mode=always
Where:
-
name
: Logical name of the Debezium connector. Used for the connector's internal needs. -
database.hostname
: Special FQDN for connection to the source cluster's master host.You can get the cluster ID with the list of clusters in the folder.
-
database.user
: PostgreSQL user name. -
database.dbname
: PostgreSQL database name. -
database.server.name
: Name of the database server that Debezium will use when choosing a topic for sending messages. -
table.include.list
: Names of tables for Debezium to track changes in. Specify full names that include the schema name (default:public
). Debezium will use values from this field when selecting a topic for sending messages. -
publication.name
: Name of the publication created on the source cluster. -
slot.name
: Name of the replication slot Debezium will create for the publication. -
heartbeat.interval.ms
andheartbeat.topics.prefix
: Heartbeat settings required for Debezium. -
snapshot.mode
: Type of snapshot created at connector startup. For the connector to run properly, set this parameter toalways
.
-
Prepare the target cluster
-
Create a topic to store data from the source cluster:
-
Name:
mpg.public.measurements
.Data topic names follow
the<server_name>.<schema_name>.<table_name>
convention.According to the Debezium configuration file:
- The
mpg
server name is specified in thedatabase.server.name
parameter. - The
public
schema name is specified together with themeasurements
table name in thetable.include.list
parameter.
- The
If you need to track data changes in multiple tables, create a separate topic for each one of them.
-
-
Create a service topic to track the connector status:
-
Name:
debezium-heartbeat.mpg
Service topic names follow
the<prefix_for_heartbeat>.<server_name>
convention.According to the Debezium configuration file:
- The
debezium-heartbeat
prefix is specified in theheartbeat.topics.prefix
parameter. - The
mpg
server name is specified in thedatabase.server.name
parameter.
- The
-
Cleanup policy:
Compact
.
If you need data from multiple source clusters, create a separate service topic for each of them.
-
-
Create a user named
debezium
. -
Grant
debezium
theACCESS_ROLE_CONSUMER
andACCESS_ROLE_PRODUCER
permissions for the topics you created.
Start Debezium
-
Create a file with Debezium worker settings:
/etc/debezium/worker.conf
# AdminAPI connect properties bootstrap.servers=<broker_host_1_FQDN>:9091,...,<broker_host_N_FQDN>:9091 sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_SSL ssl.truststore.location=/etc/debezium/keystore.jks ssl.truststore.password=<JKS_password> sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<debezium_user_password>"; # Producer connect properties producer.sasl.mechanism=SCRAM-SHA-512 producer.security.protocol=SASL_SSL producer.ssl.truststore.location=/etc/debezium/keystore.jks producer.ssl.truststore.password=<JKS_password> producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<debezium_user_password>"; # Worker properties plugin.path=/etc/debezium/plugins/ key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/etc/debezium/worker.offset
-
In a separate terminal, start the connector:
sudo /opt/kafka/bin/connect-standalone.sh \ /etc/debezium/worker.conf \ /etc/debezium/mdb-connector.conf
Check the health of Debezium
-
In a separate terminal, run the
kafkacat
utility in consumer mode:kafkacat \ -C \ -b <broker_host_1_FQDN>:9091,...,<broker_host_N_FQDN>:9091 \ -t mpg.public.measurements \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=debezium \ -X sasl.password=<password> \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -Z \ -K:
The output will return the data format schema of the
db1.public.measurements
table and information about the previously added rows.Example of the message fragment
{ "schema": { ... }, "payload": { "before": null, "after": { "device_id": "iv9a94th6rzt********", "datetime": 1591378020000000, "latitude": 55.70329, "longitude": 37.65472, "altitude": 427.5, "speed": 0.0, "battery_voltage": 23.5, "cabin_temperature": 17.0, "fuel_level": null }, "source": { "version": "1.8.1.Final", "connector": "postgresql", "name": "mpg", "ts_ms": 1628245046882, "snapshot": "true", "db": "db1", "sequence": "[null,\"4328525512\"]", "schema": "public", "table": "measurements", "txId": 8861, "lsn": 4328525328, "xmin": null }, "op": "r", "ts_ms": 1628245046893, "transaction": null } }
-
Connect to the source cluster.
When connecting, you may get this error:
ERROR Postgres roles LOGIN and REPLICATION are not assigned to user
. You can ignore it, as it does not affect Debezium performance. -
Add another row to the
measurements
table:INSERT INTO public.measurements VALUES ('iv7b74th678t********', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
-
Make sure the terminal running
kafkacat
displays details about the added row.
Delete the resources you created
Delete the resources you no longer need to avoid paying for them:
-
If you reserved a public static IP address for the virtual machine, release and delete it.
-
Delete the clusters: