Using Confluent Schema Registry with Managed Service for Apache Kafka®
In Managed Service for Apache Kafka®, you can use the integrated Managed Schema Registry. For more information, see Working with the managed schema registry. To use Confluent Schema Registry
Note
We tested this tutorial with Confluent Schema Registry 6.2 and a VM running Ubuntu 20.04 LTS. We do not guarantee support for newer versions.
To use Confluent Schema Registry with Managed Service for Apache Kafka®:
- Create a topic for notifications about data format schema changes.
- Install and configure Confluent Schema Registry on your VM.
- Create producer and consumer scripts.
- Make sure Confluent Schema Registry works correctly.
If you no longer need the resources you created, delete them.
Required paid resources
The support cost for this solution includes:
- Managed Service for Apache Kafka® cluster fee, which covers the use of computing resources allocated to hosts (including ZooKeeper hosts) and disk space (see Apache Kafka® pricing).
- Fee for public IP addresses if public access is enabled for cluster hosts (see Virtual Private Cloud pricing).
- VM fee, which covers the use of computing resources, storage, and public IP address (see Compute Cloud pricing).
Getting started
-
Create a Managed Service for Apache Kafka® cluster of any suitable configuration.
- Create a topic named
messagesfor exchanging messages between the producer and the consumer. - Create a user named
userand grant them permissions for themessagestopic:ACCESS_ROLE_CONSUMERACCESS_ROLE_PRODUCER
- Create a topic named
-
In the network hosting the Managed Service for Apache Kafka® cluster, create a VM running Ubuntu 20.04 LTS from Cloud Marketplace with a public IP address.
-
If using security groups, configure them to allow all required traffic between your Managed Service for Apache Kafka® cluster and VM.
-
In the VM security group, create an inbound rule that allows connections via port
8081which is used by the producer and consumer to access the schema registry:- Port range:
8081. - Protocol:
TCP. - Destination name:
CIDR. - CIDR blocks:
0.0.0.0/0or address ranges of the subnets used by the producer and the consumer.
- Port range:
Create a topic for notifications about data format schema changes
-
Create a service topic named
_schemaswith the following settings:- Number of partitions:
1. - Cleanup policy:
Compact.
Warning
These values for Number of partitions and Cleanup policy are required for Confluent Schema Registry to run correctly.
- Number of partitions:
-
Create a user named
registryand grant them permissions for the_schemastopic:ACCESS_ROLE_CONSUMERACCESS_ROLE_PRODUCER
Confluent Schema Registry will use this account to work with
_schemas.
Install and configure Confluent Schema Registry on your VM
-
Add the Confluent Schema Registry repository:
wget -qO - https://packages.confluent.io/deb/6.2/archive.key | sudo apt-key add - && \ sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/6.2 stable main" -
Install the packages:
sudo apt-get update && \ sudo apt-get install \ confluent-schema-registry \ openjdk-11-jre-headless \ python3-pip --yes -
Create a secure store for the certificate:
sudo keytool \ -keystore /etc/schema-registry/client.truststore.jks \ -alias CARoot \ -import -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -storepass <secure_certificate_storage_password> \ --noprompt -
Create a file named
/etc/schema-registry/jaas.confwith settings for connecting to the cluster:KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="registry" password="<registry_user_password>"; }; -
Edit the
/etc/schema-registry/schema-registry.propertiesfile containing Confluent Schema Registry settings:-
Comment out the line as follows:
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092 -
Uncomment the line with the
listenersparameter. It configures the network address and port that Confluent Schema Registry listens to. The default port for all network interfaces is8081:listeners=http://0.0.0.0:8081 -
Add the following lines at the end of the file:
kafkastore.bootstrap.servers=SASL_SSL://<broker_host_1_FQDN:9091>,<broker_host_2_FQDN:9091>,...,<broker_host_N_FQDN:9091> kafkastore.ssl.truststore.location=/etc/schema-registry/client.truststore.jks kafkastore.ssl.truststore.password=<secure_certificate_storage_password> kafkastore.sasl.mechanism=SCRAM-SHA-512 kafkastore.security.protocol=SASL_SSLYou can get a list of broker hosts with a list of cluster hosts.
-
-
Edit the
/lib/systemd/system/confluent-schema-registry.servicefile which describes the systemd module.-
Go to the
[Service]section. -
Add the
Environmentparameter with Java settings:... [Service] Type=simple User=cp-schema-registry Group=confluent Environment="LOG_DIR=/var/log/confluent/schema-registry" Environment="_JAVA_OPTIONS='-Djava.security.auth.login.config=/etc/schema-registry/jaas.conf'" ...
-
-
Update the
systemdmodule details:sudo systemctl daemon-reload -
Start the Confluent Schema Registry service:
sudo systemctl start confluent-schema-registry.service -
Set Confluent Schema Registry to start automatically after a system reboot:
sudo systemctl enable confluent-schema-registry.service
Create producer and consumer scripts
These scripts send and receive messages in the messages topic as a key:value pair. This example shows the data format schemas in Avro
Note
Python scripts are provided for demonstration only. You can prepare and send data format schemas and the data itself by creating a similar script in another language.
-
Install the required Python packages:
sudo pip3 install avro confluent_kafka -
Create a Python script for the consumer.
Here is how the script works:
- Connect to the
messagestopic and Confluent Schema Registry. - Continuously read messages arriving in the
messagestopic. - When receiving a message, request the required schemas from Confluent Schema Registry to parse the message.
- Parse the message binary data based on the key and value schemas and display the result.
consumer.py#!/usr/bin/python3 from confluent_kafka.avro import AvroConsumer from confluent_kafka.avro.serializer import SerializerError c = AvroConsumer( { "bootstrap.servers": ','.join([ "<broker_host_1_FQDN>:9091", ... "<broker_host_N_FQDN>:9091", ]), "group.id": "avro-consumer", "security.protocol": "SASL_SSL", "ssl.ca.location": "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt", "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": "user", "sasl.password": "<user_password>", "schema.registry.url": "http://<Confluent_Schema_Registry_server_FQDN_or_IP_address>:8081", } ) c.subscribe(["messages"]) while True: try: msg = c.poll(10) except SerializerError as e: print("Message deserialization failed for {}: {}".format(msg, e)) break if msg is None: continue if msg.error(): print("AvroConsumer error: {}".format(msg.error())) continue print(msg.value()) c.close() - Connect to the
-
Create a Python script for the producer.
Here is how the script works:
- Connect to the schema registry and send the key and value data format schemas.
- Generate the key and value based on the schemas you sent.
- Send a message containing a
key:valuepair to themessagestopic. The system will automatically add the schema versions to your message.
producer.py#!/usr/bin/python3 from confluent_kafka import avro from confluent_kafka.avro import AvroProducer value_schema_str = """ { "namespace": "my.test", "name": "value", "type": "record", "fields": [ { "name": "name", "type": "string" } ] } """ key_schema_str = """ { "namespace": "my.test", "name": "key", "type": "record", "fields": [ { "name": "name", "type": "string" } ] } """ value_schema = avro.loads(value_schema_str) key_schema = avro.loads(key_schema_str) value = {"name": "Value"} key = {"name": "Key"} def delivery_report(err, msg): """Called once for each message produced to indicate delivery result. Triggered by poll() or flush().""" if err is not None: print("Message delivery failed: {}".format(err)) else: print("Message delivered to {} [{}]".format(msg.topic(), msg.partition())) avroProducer = AvroProducer( { "bootstrap.servers": ','.join([ "<broker_host_1_FQDN>:9091", ... "<broker_host_N_FQDN>:9091", ]), "security.protocol": "SASL_SSL", "ssl.ca.location": "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt", "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": "user", "sasl.password": "<user_password>", "on_delivery": delivery_report, "schema.registry.url": "http://<Schema_Registry_server_FQDN_or_IP_address>:8081", }, default_key_schema=key_schema, default_value_schema=value_schema, ) avroProducer.produce(topic="messages", key=key, value=value) avroProducer.flush()
Make sure Confluent Schema Registry works correctly
-
Start the consumer:
python3 ./consumer.py -
In a separate terminal, start the producer:
python3 ./producer.py -
Make sure the data sent by the producer is received and correctly interpreted by the consumer:
{'name': 'Value'}
Delete the resources you created
Delete the resources you no longer need to avoid paying for them:
- Delete the Managed Service for Apache Kafka® cluster.
- Delete the VM.
- If you reserved public static IP addresses, release and delete them.