Delivering data from Yandex Managed Service for MySQL® to Yandex Managed Service for Apache Kafka® using Debezium
You can track data changes in Managed Service for MySQL® and send them to Managed Service for Apache Kafka® using Change Data Capture (CDC).
In this article, you will learn how to create a virtual machine in Yandex Cloud and set up Debezium
Getting started
-
Create a source cluster with the following settings:
- Hosts: Publicly available
- Database:
db1
- User:
user1
-
Create a Managed Service for Apache Kafka® target cluster in any suitable configuration with publicly available hosts.
-
Create a virtual machine with Ubuntu 20.04 and a public IP address.
-
If you are using security groups, configure them to enable connecting to the clusters both from the internet and from the created VM. In addition, enable connecting to this VM over SSH from the internet:
-
Connect to a virtual machine over SSH and perform preliminary setup:
-
Install the dependencies:
sudo apt update && \ sudo apt install kafkacat openjdk-17-jre mysql-client --yes
Check that you can use it to connect to the Managed Service for Apache Kafka® source cluster over SSL.
-
Create a folder for Apache Kafka®:
sudo mkdir -p /opt/kafka/
-
Download and unpack the archive with Apache Kafka® executable files in this folder. For example, to download and unpack Apache Kafka® 3.0, run the command:
wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz && \ sudo tar xf kafka_2.13-3.0.0.tgz --strip 1 --directory /opt/kafka/
You can check the current Apache Kafka® version on the page with project downloads
. -
Install certificates on the VM and check the availability of clusters:
- Managed Service for Apache Kafka® (use
kafkacat
) - Managed Service for MySQL® (use
mysql
)
- Managed Service for Apache Kafka® (use
-
Create a folder that will store the files required for the operation of the Debezium connector:
sudo mkdir -p /etc/debezium/plugins/
-
The Debezium connector can connect to Managed Service for Apache Kafka® broker hosts if an SSL certificate is added to Java secure storage (Java Key Store). For added storage security, add a password, at least 6 characters long, to the
-storepass
parameter:sudo keytool \ -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore /etc/debezium/keystore.jks \ -storepass <JKS_password> \ --noprompt
-
Preparing the source cluster
-
Assign the
REPLICATION CLIENT
andREPLICATION SLAVE
global privileges touser1
. -
Connect to the
db1
database underuser1
. -
Add test data to the database. In this example, a simple table with information from car sensors is used.
-
Create a table:
CREATE TABLE measurements ( `device_id` VARCHAR(32) PRIMARY KEY NOT NULL, `datetime` TIMESTAMP NOT NULL, `latitude` REAL NOT NULL, `longitude` REAL NOT NULL, `altitude` REAL NOT NULL, `speed` REAL NOT NULL, `battery_voltage` REAL, `cabin_temperature` REAL NOT NULL, `fuel_level` REAL );
-
Populate the table with data:
INSERT INTO measurements VALUES ('iv9a94th6rzt********', '2020-06-05 17:27:00', 55.70329032, 37.65472196, 427.5, 0, 23.5, 17, NULL), ('rhibbh3y08qm********', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32), ('iv9a94th678t********', '2020-06-07 15:00:10', 55.70985913, 37.62141918, 417.0, 15.7, 10.3, 17, NULL);
-
Configure Debezium
-
Connect to the virtual machine over SSH.
-
Download an up-to-date Debezium connector
and unpack it to the/etc/debezium/plugins/
directory.You can check the current connector version on the project page
. The commands for version1.9.4.Final
are below.VERSION="1.9.4.Final" wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/${VERSION}/debezium-connector-mysql-${VERSION}-plugin.tar.gz && \ sudo tar -xzvf debezium-connector-mysql-${VERSION}-plugin.tar.gz -C /etc/debezium/plugins/
-
Create a file named
/etc/debezium/mdb-connector.conf
with the Debezium connector settings for connecting to the source cluster:name=debezium-mmy connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=c-<cluster_ID>.rw.mdb.yandexcloud.net database.port=3306 database.user=user1 database.password=<user1_password> database.dbname=db1 database.server.name=mmy database.ssl.mode=required_identity table.include.list=db1.measurements heartbeat.interval.ms=15000 heartbeat.topics.prefix=__debezium-heartbeat snapshot.mode=never include.schema.changes=false database.history.kafka.topic=dbhistory.mmy database.history.kafka.bootstrap.servers=<broker_host_1_FQDN>:9091,...,<broker_host_N_FQDN>:9091 # Producer settings database.history.producer.security.protocol=SSL database.history.producer.ssl.truststore.location=/etc/debezium/keystore.jks database.history.producer.ssl.truststore.password=<JKS_password> database.history.producer.sasl.mechanism=SCRAM-SHA-512 database.history.producer.security.protocol=SASL_SSL database.history.producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<debezium_user_password>"; # Consumer settings database.history.consumer.security.protocol=SSL database.history.consumer.ssl.truststore.location=/etc/debezium/keystore.jks database.history.consumer.ssl.truststore.password=<JKS_password> database.history.consumer.sasl.mechanism=SCRAM-SHA-512 database.history.consumer.security.protocol=SASL_SSL database.history.consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<debezium_user_password>";
Where:
-
name
: Logical name of the Debezium connector. Used for the connector's internal needs. -
database.hostname
: Special FQDN for connection to the source cluster's master host.You can get the cluster ID with the list of clusters in the folder.
-
database.user
: MySQL® user name. -
database.dbname
: MySQL® database name. -
database.server.name
: Name of the database server that Debezium will use when choosing a topic for sending messages. -
table.include.list
: Names of tables for Debezium to track changes in. Specify full names that include the database name (db1
). Debezium will use values from this field when selecting a topic for sending messages. -
heartbeat.interval.ms
andheartbeat.topics.prefix
: Heartbeat settings required for Debezium. -
database.history.kafka.topic
: Name of the service topic the connector uses to send notifications about changes to the data schema in the source cluster.
-
Prepare the target cluster
-
Create a topic to store data from the source cluster:
-
Name:
mmy.db1.measurements
.Data topic names follow
the<server_name>.<DB_name>.<table_name>
convention.According to the Debezium configuration file:
- The
mmy
server name is specified in thedatabase.server.name
parameter. - The
db1
database name is specified together with themeasurements
table name in thetable.include.list
parameter.
- The
If you need to track data changes in multiple tables, create a separate topic for each one of them.
-
-
Create a service topic to track the connector status:
-
Name:
__debezium-heartbeat.mmy
.Service topic names follow
the<prefix_for_heartbeat>.<server_name>
convention.According to the Debezium configuration file:
- The
__debezium-heartbeat
prefix is specified in theheartbeat.topics.prefix
parameter. - The
mmy
server name is specified in thedatabase.server.name
parameter.
- The
-
Cleanup policy:
Compact
.
If you need data from multiple source clusters, create a separate service topic for each of them.
-
-
Create a service topic to track changes to the data format schema:
- Name:
dbhistory.mmy
- Cleanup policy:
Delete
- Number of partitions:
1
- Name:
-
Create a user named
debezium
. -
Grant
debezium
theACCESS_ROLE_CONSUMER
andACCESS_ROLE_PRODUCER
permissions for the topics you created.
Start Debezium
-
Create a file with Debezium worker settings:
/etc/debezium/worker.conf
# AdminAPI connect properties bootstrap.servers=<broker_host_1_FQDN>:9091,...,<broker_host_N_FQDN>:9091 sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_SSL ssl.truststore.location=/etc/debezium/keystore.jks ssl.truststore.password=<JKS_password> sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<debezium_user_password>"; # Producer connect properties producer.sasl.mechanism=SCRAM-SHA-512 producer.security.protocol=SASL_SSL producer.ssl.truststore.location=/etc/debezium/keystore.jks producer.ssl.truststore.password=<JKS_password> producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="debezium" password="<debezium_user_password>"; # Worker properties plugin.path=/etc/debezium/plugins/ key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/etc/debezium/worker.offset
-
In a separate terminal, start the connector:
sudo /opt/kafka/bin/connect-standalone.sh \ /etc/debezium/worker.conf \ /etc/debezium/mdb-connector.properties
Check the health of Debezium
-
In a separate terminal, run the
kafkacat
utility in consumer mode:kafkacat \ -C \ -b <broker_host_1_FQDN>:9091,...,<broker_host_N_FQDN>:9091 \ -t mmy.db1.measurements \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=debezium \ -X sasl.password=<password> \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -Z \ -K:
The output will return the data format schema of the
db1.measurements
table and information about the previously added rows.Example of the message fragment
{ "schema": { ... }, "payload": { "before": null, "after": { "device_id": "iv9a94th6rzt********", "datetime": 1591378020000000, "latitude": 55.70329, "longitude": 37.65472, "altitude": 427.5, "speed": 0.0, "battery_voltage": 23.5, "cabin_temperature": 17.0, "fuel_level": null }, "source": { "version": "1.8.1.Final", "connector": "mysql", "name": "mmy", "ts_ms": 1628245046882, "snapshot": "true", "db": "db1", "sequence": "[null,\"4328525512\"]", "table": "measurements", "txId": 8861, "lsn": 4328525328, "xmin": null }, "op": "r", "ts_ms": 1628245046893, "transaction": null } }
-
Connect to the source cluster and add another row to the
measurements
table:INSERT INTO measurements VALUES ('iv7b74th678t********', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
-
Make sure the terminal running
kafkacat
displays details about the added row.
Delete the resources you created
Delete the resources you no longer need to avoid paying for them:
-
If you reserved a public static IP address for the virtual machine, release and delete it.
-
Delete the clusters: