Configuring Kafka Connect for Managed Service for Apache Kafka® clusters
Note
Managed Service for Apache Kafka® has built-in support for certain connectors and allows you to manage them. For a list of available connectors, see Connectors. If you need other connectors or want to manage Kafka Connect manually, refer to this tutorial.
Kafka Connect is a tool for streaming data between Apache Kafka® and other data storages.
Data in Kafka Connect is handled using processes called workers. You can deploy the tool either in distributed mode with multiple workers or standalone mode with a single worker.
Data is moved using connectors that are run in separate worker threads.
To learn more about Kafka Connect, see the documentation Apache Kafka®
Next, we will configure Kafka Connect to interact with a Managed Service for Apache Kafka® cluster. The tool will be deployed on a Yandex Cloud VM as a separate installation. SSL encryption will be used to protect the connection.
We will also set up a simple FileStreamSource
Note
You can use any other Kafka Connect connector to interact with Managed Service for Apache Kafka® clusters.
To configure Kafka Connect to work with a Managed Service for Apache Kafka® cluster:
If you no longer need the resources you created, delete them.
Getting started
-
Create a Managed Service for Apache Kafka® cluster with any suitable configuration.
-
Create a topic named
messages
for exchanging messages between Kafka Connect and the Managed Service for Apache Kafka® cluster. -
Create a user named
user
and grant them permissions for themessages
topic:ACCESS_ROLE_CONSUMER
ACCESS_ROLE_PRODUCER
-
In the network hosting the Managed Service for Apache Kafka® cluster, create a virtual machine with Ubuntu 20.04 and a public IP address.
-
If you are using security groups, configure them to allow all required traffic between the Managed Service for Apache Kafka® cluster and the VM.
-
If you do not have Terraform yet, install it.
-
Get the authentication credentials. You can add them to environment variables or specify them later in the provider configuration file.
-
Configure and initialize a provider. There is no need to create a provider configuration file manually, you can download it
. -
Place the configuration file in a separate working directory and specify the parameter values. If you did not add the authentication credentials to environment variables, specify them in the configuration file.
-
Download the kafka-connect.tf
configuration file to the same working directory.This file describes:
-
Network
-
Subnet
-
Default security group and rules required to connect to the cluster and VM from the internet.
-
Virtual machine with Ubuntu 20.04.
-
Properly configured Managed Service for Apache Kafka® cluster.
-
-
In the file, specify the password for the user named
user
you are going to use to access the Managed Service for Apache Kafka® cluster, as well as the username and the public part of the SSH key for the virtual machine. If the virtual machine will be running Ubuntu 20.04 from the recommended image list, the username you put here will be ignored. In which case useubuntu
for username to establish the connection. -
Make sure the Terraform configuration files are correct using this command:
terraform validate
If there are any errors in the configuration files, Terraform will point them out.
-
Create the required infrastructure:
-
Run the command to view planned changes:
terraform plan
If the resource configuration descriptions are correct, the terminal will display a list of the resources to modify and their parameters. This is a test step. No resources are updated.
-
If you are happy with the planned changes, apply them:
-
Run the command:
terraform apply
-
Confirm the update of resources.
-
Wait for the operation to complete.
-
All the required resources will be created in the specified folder. You can check resource availability and their settings in the management console
. -
Configure the VM
-
Install JDK and the kcat
utility:sudo apt update && \ sudo apt install default-jdk --yes && \ sudo apt install kafkacat
-
Download
and unpack the archive containing Apache Kafka®:wget https://downloads.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz && tar -xvf kafka_2.12-3.1.0.tgz --strip 1 --directory /opt/kafka/
This example uses Apache Kafka® version
3.1.0
. -
Add the SSL certificate to the Java trusted certificate store (Java Key Store) so that the Apache Kafka® driver can use this certificate for secure connections to the cluster hosts. Set a password of at least 6 characters using the
-storepass
parameter for additional storage protection:sudo keytool -importcert \ -alias YandexCA -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -keystore ssl -storepass <certificate_store_password> \ --noprompt
-
Create a folder with worker settings and copy the store there:
sudo mkdir --parents /etc/kafka-connect-worker && \ sudo cp ssl /etc/kafka-connect-worker/client.truststore.jks
Prepare the test data
Create a file named /var/log/sample.json
with test data. This file contains data from car sensors in JSON format:
sample.json
{"device_id":"iv9a94th6rzt********","datetime":"2020-06-05 17:27:00","latitude":55.70329032,"longitude":37.65472196,"altitude":427.5,"speed":0,"battery_voltage":23.5,"cabin_temperature":17,"fuel_level":null}
{"device_id":"rhibbh3y08qm********","datetime":"2020-06-06 09:49:54","latitude":55.71294467,"longitude":37.66542005,"altitude":429.13,"speed":55.5,"battery_voltage":null,"cabin_temperature":18,"fuel_level":32}
{"device_id":"iv9a94th6rzt********","datetime":"2020-06-07 15:00:10","latitude":55.70985913,"longitude":37.62141918,"altitude":417,"speed":15.7,"battery_voltage":10.3,"cabin_temperature":17,"fuel_level":null}
Configure Kafka Connect
-
Create a file named
/etc/kafka-connect-worker/worker.properties
with worker settings:# AdminAPI connect properties bootstrap.servers=<broker_host_FQDN>:9091 sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_SSL ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks ssl.truststore.password=<certificate_storage_password> sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<user_password>"; # Producer connect properties producer.sasl.mechanism=SCRAM-SHA-512 producer.security.protocol=SASL_SSL producer.ssl.truststore.location=/etc/kafka-connect-worker/client.truststore.jks producer.ssl.truststore.password=<certificate_storage_password> producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="<user_password>"; # Worker properties plugin.path=/etc/kafka-connect-worker/plugins key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/etc/kafka-connect-worker/worker.offset
Kafka Connect will connect to the Managed Service for Apache Kafka® cluster as the user named
user
created earlier.You can request the FQDNs of broker hosts with a list of hosts in the cluster.
-
Create a file named
/etc/kafka-connect-worker/file-connector.properties
with connector settings:name=local-file-source connector.class=FileStreamSource tasks.max=1 file=/var/log/sample.json topic=messages
Where:
file
: Name of the file the connector will read data from.topic
: Name of the Managed Service for Apache Kafka® cluster topic the connector will feed data to.
Run Kafka Connect and test it
-
To send test data to the cluster, run the worker on the VM:
cd ~/opt/kafka/bin/ && \ sudo ./connect-standalone.sh \ /etc/kafka-connect-worker/worker.properties \ /etc/kafka-connect-worker/file-connector.properties
-
Connect to the cluster using kcat and retrieve data from the cluster topic:
kafkacat -C \ -b <broker_host_FQDN>:9091 \ -t messages \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=user \ -X sasl.password="<user_account_password>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z -K:
You can request the FQDNs of broker hosts with a list of hosts in the cluster.
In the command output, you will see the contents of the
/var/log/sample.json
test file provided in the previous step.
Delete the resources you created
Delete the resources you no longer need to avoid paying for them:
- Delete the VM.
- If you reserved a public static IP address for the VM, delete it.
- Delete the Managed Service for Apache Kafka® cluster.
To delete the infrastructure created with Terraform:
-
In the terminal window, go to the directory containing the infrastructure plan.
-
Delete the
kafka-connect.tf
configuration file. -
Make sure the Terraform configuration files are correct using this command:
terraform validate
If there are any errors in the configuration files, Terraform will point them out.
-
Confirm updating the resources.
-
Run the command to view planned changes:
terraform plan
If the resource configuration descriptions are correct, the terminal will display a list of the resources to modify and their parameters. This is a test step. No resources are updated.
-
If you are happy with the planned changes, apply them:
-
Run the command:
terraform apply
-
Confirm the update of resources.
-
Wait for the operation to complete.
-
All the resources described in the configuration file will be deleted.
-