Using Confluent Schema Registry with Managed Service for Apache Kafka®
In Managed Service for Apache Kafka®, you can use a built-in Managed Schema Registry data format schema registry. For more information, see Working with the managed schema registry. If you need Confluent Schema Registry
Note
The guide has been tested on Confluent Schema Registry 6.2 and a VM running Ubuntu 20.04 LTS. We cannot guarantee the performance if newer versions are used.
To use Confluent Schema Registry together with Managed Service for Apache Kafka®:
- Create a topic for notifications about changes in data format schemas.
- Install and configure Confluent Schema Registry on a VM.
- Create producer and consumer scripts.
- Make sure that Confluent Schema Registry is working correctly.
If you no longer need the resources you created, delete them.
Getting started
-
Create a Managed Service for Apache Kafka® cluster with any suitable configuration.
- Create a topic named
messages
for exchanging messages between the producer and the consumer. - Create a user named
user
and grant them permission for themessages
topic:ACCESS_ROLE_CONSUMER
ACCESS_ROLE_PRODUCER
- Create a topic named
-
In the network hosting the Managed Service for Apache Kafka® cluster, create a VM with Ubuntu 20.04 LTS from Cloud Marketplace and with a public IP address.
-
If you are using security groups, configure them to allow all required traffic between the Managed Service for Apache Kafka® cluster and the VM.
-
In the VM security group, add a rule for incoming traffic that allows connections via port
8081
which is used by the producer and consumer to access the schema registry:- Port range:
8081
- Protocol:
TCP
- Destination name:
CIDR
- CIDR blocks:
0.0.0.0/0
or address ranges of the subnets where the producer and consumer run
- Port range:
Create a topic for notifications about changes in data format schemas
-
Create a service topic named
_schemas
with the following settings:- Number of partitions:
1
- Cleanup policy:
Compact
Warning
The specified settings of the Number of partitions and Cleanup policy values are necessary for Confluent Schema Registry to work.
- Number of partitions:
-
Create a user named
registry
and grant them the rights for the_schemas
topic:ACCESS_ROLE_CONSUMER
ACCESS_ROLE_PRODUCER
Confluent Schema Registry will interact with the
_schemas
service topic on behalf of this user.
Install and configure Confluent Schema Registry on a VM
-
Add the Confluent Schema Registry repository:
wget -qO - https://packages.confluent.io/deb/6.2/archive.key | sudo apt-key add - && \ sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/6.2 stable main"
-
Install the packages:
sudo apt-get update && \ sudo apt-get install \ confluent-schema-registry \ openjdk-11-jre-headless \ python3-pip --yes
-
Create secure storage for the certificate:
sudo keytool \ -keystore /etc/schema-registry/client.truststore.jks \ -alias CARoot \ -import -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -storepass <secure_certificate_store_password> \ --noprompt
-
Create the
/etc/schema-registry/jaas.conf
file with settings for connecting to the cluster:KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="registry" password="<registry_user_password>"; };
-
Edit the
/etc/schema-registry/schema-registry.properties
file that configures Confluent Schema Registry:-
Comment out the line:
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
-
Uncomment the line with the
listeners
parameter. It configures the network address and port that Confluent Schema Registry listens to. The default port for all network interfaces is8081
:listeners=http://0.0.0.0:8081
-
Add the following lines at the end of the file:
kafkastore.bootstrap.servers=SASL_SSL://<FQDN_of_broker_host_1:9091>,<FQDN_of_broker_host_2:9091>,...,<FQDN_of_broker_host_N:9091> kafkastore.ssl.truststore.location=/etc/schema-registry/client.truststore.jks kafkastore.ssl.truststore.password=<secure_certificate_store_password> kafkastore.sasl.mechanism=SCRAM-SHA-512 kafkastore.security.protocol=SASL_SSL
You can get a list of broker hosts with a list of cluster hosts.
-
-
Edit the file with the description of the systemd module
/lib/systemd/system/confluent-schema-registry.service
.-
Go to the
[Service]
section. -
Add the
Environment
parameter with Java settings:... [Service] Type=simple User=cp-schema-registry Group=confluent Environment="LOG_DIR=/var/log/confluent/schema-registry" Environment="_JAVA_OPTIONS='-Djava.security.auth.login.config=/etc/schema-registry/jaas.conf'" ...
-
-
Update the details about the systemd modules:
sudo systemctl daemon-reload
-
Start the Confluent Schema Registry service:
sudo systemctl start confluent-schema-registry.service
-
Enable automatic start of Confluent Schema Registry after OS restart:
sudo systemctl enable confluent-schema-registry.service
Create producer and consumer scripts
The above scripts send and receive messages
in the messages topic as a key:value
pair. In the example, data format schemas are described in Avro
Note
Python scripts are provided for demonstration. You can prepare and send data format schemas and the data itself by creating a similar script in another language.
-
Install the necessary Python packages:
sudo pip3 install avro confluent_kafka
-
Create a Python script for the consumer.
The script works as follows:
- Connect to the
messages
topic and Confluent Schema Registry. - In a continuous cycle, read messages sent to the
messages
topic. - When receiving a message, request the necessary schemas in Confluent Schema Registry to parse the message.
- Parse binary data from the message according to the schemas for the key and value and display the result on the screen.
consumer.py
#!/usr/bin/python3 from confluent_kafka.avro import AvroConsumer from confluent_kafka.avro.serializer import SerializerError c = AvroConsumer( { "bootstrap.servers": ','.join([ "<FQDN_of_broker_host_1>:9091", ... "<FQDN_of_broker_host_N>:9091", ]), "group.id": "avro-consumer", "security.protocol": "SASL_SSL", "ssl.ca.location": "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt", "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": "user", "sasl.password": "<password_of_the_user_named_user>", "schema.registry.url": "http://<FQDN_or_IP_of_Confluent_Schema_Registry_server>:8081", } ) c.subscribe(["messages"]) while True: try: msg = c.poll(10) except SerializerError as e: print("Message deserialization failed for {}: {}".format(msg, e)) break if msg is None: continue if msg.error(): print("AvroConsumer error: {}".format(msg.error())) continue print(msg.value()) c.close()
- Connect to the
-
Create a Python script for the producer.
The script works as follows:
- Connect to the schema registry and provide to it the data format schemas for the key and value.
- Generate the key and value based on the schemas provided.
- Send a
message
consisting of thekey:meaning
pair to the messages topic. The schema versions are added to the message automatically.
producer.py
#!/usr/bin/python3 from confluent_kafka import avro from confluent_kafka.avro import AvroProducer value_schema_str = """ { "namespace": "my.test", "name": "value", "type": "record", "fields": [ { "name": "name", "type": "string" } ] } """ key_schema_str = """ { "namespace": "my.test", "name": "key", "type": "record", "fields": [ { "name": "name", "type": "string" } ] } """ value_schema = avro.loads(value_schema_str) key_schema = avro.loads(key_schema_str) value = {"name": "Value"} key = {"name": "Key"} def delivery_report(err, msg): """Called once for each message produced to indicate delivery result. Triggered by poll() or flush().""" if err is not None: print("Message delivery failed: {}".format(err)) else: print("Message delivered to {} [{}]".format(msg.topic(), msg.partition())) avroProducer = AvroProducer( { "bootstrap.servers": ','.join([ "<FQDN_of_broker_host_1>:9091", ... "<FQDN_of_broker_host_N>:9091", ]), "security.protocol": "SASL_SSL", "ssl.ca.location": "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt", "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": "user", "sasl.password": "<password_of_the_user_named_user>", "on_delivery": delivery_report, "schema.registry.url": "http://<FQDN_or_IP_of_Schema_Registry_server>:8081", }, default_key_schema=key_schema, default_value_schema=value_schema, ) avroProducer.produce(topic="messages", key=key, value=value) avroProducer.flush()
Make sure that Confluent Schema Registry is working correctly
-
Start the consumer:
python3 ./consumer.py
-
In a separate terminal, start the producer:
python3 ./producer.py
-
Make sure that the data sent by the producer is received and correctly interpreted by the consumer:
{'name': 'Value'}
Delete the resources you created
Delete the resources you no longer need to avoid paying for them:
- Delete the Managed Service for Apache Kafka® cluster.
- Delete the virtual machine.
- If you reserved public static IP addresses, release and delete them.