Использование Confluent Schema Registry с Managed Service for Apache Kafka®
В Managed Service for Apache Kafka® вы можете использовать интегрированный реестр схем формата данных Managed Schema Registry. Подробнее см. в разделе Работа с управляемым реестром схем формата данных. Если вам необходим реестр Confluent Schema Registry
Примечание
Руководство проверено на версии Confluent Schema Registry 6.2 и виртуальной машине с Ubuntu 20.04 LTS. Работоспособность при использовании новых версий не гарантируется.
Чтобы использовать Confluent Schema Registry совместно с Managed Service for Apache Kafka®:
- Создайте топик для уведомлений об изменении схем форматов данных.
- Установите и настройте Confluent Schema Registry на виртуальной машине.
- Создайте скрипты производителя и потребителя.
- Проверьте правильность работы Confluent Schema Registry.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
-
Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации.
- Создайте топик с именем
messages
для обмена сообщениями между производителем и потребителем. - Создайте пользователя с именем
user
и выдайте ему права на топикmessages
:ACCESS_ROLE_CONSUMER
,ACCESS_ROLE_PRODUCER
.
- Создайте топик с именем
-
В той же сети, что и кластер Managed Service for Apache Kafka®, создайте виртуальную машину с Ubuntu 20.04 LTS из Cloud Marketplace и публичным IP-адресом.
-
Если вы используете группы безопасности, настройте их так, чтобы был разрешен весь необходимый трафик между кластером Managed Service for Apache Kafka® и виртуальной машиной.
-
В группах безопасности виртуальной машины создайте правило для входящего трафика, разрешающее подключение через порт
8081
— через него производитель и потребитель будут обращаться к реестру схем:- Диапазон портов:
8081
. - Протокол:
TCP
. - Назначение:
CIDR
. - CIDR блоки:
0.0.0.0/0
или диапазоны адресов подсетей, в которых работают производитель и потребитель.
- Диапазон портов:
Создайте топик для уведомлений об изменении схем форматов данных
-
Создайте служебный топик с именем
_schemas
со следующими настройками:- Количество разделов —
1
. - Политика очистки лога —
Compact
.
Важно
Указанные значения настроек Количество разделов и Политика очистки лога необходимы для работы Confluent Schema Registry.
- Количество разделов —
-
Создайте пользователя с именем
registry
и выдайте ему права на топик_schemas
:ACCESS_ROLE_CONSUMER
,ACCESS_ROLE_PRODUCER
.
От имени этого пользователя Confluent Schema Registry будет работать со служебным топиком
_schemas
.
Установите и настройте Confluent Schema Registry на виртуальной машине
-
Подключите репозиторий Confluent Schema Registry:
wget -qO - https://packages.confluent.io/deb/6.2/archive.key | sudo apt-key add - && \ sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/6.2 stable main"
-
Установите пакеты:
sudo apt-get update && \ sudo apt-get install \ confluent-schema-registry \ openjdk-11-jre-headless \ python3-pip --yes
-
Создайте для сертификата защищенное хранилище:
sudo keytool \ -keystore /etc/schema-registry/client.truststore.jks \ -alias CARoot \ -import -file /usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \ -storepass <пароль_защищенного_хранилища_сертификатов> \ --noprompt
-
Создайте файл
/etc/schema-registry/jaas.conf
с настройками для подключения к кластеру:KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="registry" password="<пароль_пользователя_registry>"; };
-
Измените файл
/etc/schema-registry/schema-registry.properties
, отвечающий за настройки Confluent Schema Registry:-
Закомментируйте строку:
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
-
Раскомментируйте строку с параметром
listeners
. Она отвечает за сетевой адрес и порт, которые будет слушать Confluent Schema Registry. По умолчанию используется порт8081
на всех сетевых интерфейсах:listeners=http://0.0.0.0:8081
-
Добавьте в конец файла строки:
kafkastore.bootstrap.servers=SASL_SSL://<FQDN_хоста-брокера_1:9091>,<FQDN_хоста-брокера_2:9091>,...,<FQDN_хоста-брокера_N:9091> kafkastore.ssl.truststore.location=/etc/schema-registry/client.truststore.jks kafkastore.ssl.truststore.password=<пароль_защищенного_хранилища_сертификатов> kafkastore.sasl.mechanism=SCRAM-SHA-512 kafkastore.security.protocol=SASL_SSL
Список хостов-брокеров можно получить со списком хостов кластера.
-
-
Измените файл с описанием модуля systemd
/lib/systemd/system/confluent-schema-registry.service
.-
Перейдите в блок
[Service]
. -
Добавьте параметр
Environment
с настройками Java:... [Service] Type=simple User=cp-schema-registry Group=confluent Environment="LOG_DIR=/var/log/confluent/schema-registry" Environment="_JAVA_OPTIONS='-Djava.security.auth.login.config=/etc/schema-registry/jaas.conf'" ...
-
-
Обновите сведения о модулях systemd:
sudo systemctl daemon-reload
-
Запустите сервис Confluent Schema Registry:
sudo systemctl start confluent-schema-registry.service
-
Включите автоматический запуск Confluent Schema Registry после перезагрузки ОС:
sudo systemctl enable confluent-schema-registry.service
Создайте скрипты производителя и потребителя
Приведенные скрипты отправляют и принимают сообщения в топике messages
в виде пары ключ:значение
. В качестве примера схемы форматов данных описаны в формате Avro
Примечание
Скрипты на Python приведены в демонстрационных целях. Вы можете подготовить и передать схемы форматов данных и сами данные, создав аналогичный скрипт на другом языке программирования.
-
Установите необходимые пакеты Python:
sudo pip3 install avro confluent_kafka
-
Создайте Python-скрипт потребителя.
Алгоритм работы скрипта:
- Подключиться к топику
messages
и реестру схем Confluent Schema Registry. - В непрерывном цикле считывать поступающие в топик
messages
сообщения. - При получении сообщения запросить в реестре схем Confluent Schema Registry нужные схемы для разбора сообщения.
- Разобрать бинарные данные из сообщения в соответствии со схемами для ключа и значения и вывести результат на экран.
consumer.py
#!/usr/bin/python3 from confluent_kafka.avro import AvroConsumer from confluent_kafka.avro.serializer import SerializerError c = AvroConsumer( { "bootstrap.servers": ','.join([ "<FQDN_хоста-брокера_1>:9091", ... "<FQDN_хоста-брокера_N>:9091", ]), "group.id": "avro-consumer", "security.protocol": "SASL_SSL", "ssl.ca.location": "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt", "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": "user", "sasl.password": "<пароль_пользователя_user>", "schema.registry.url": "http://<FQDN_или_IP-адреc_сервера_Confluent_Schema_Registry>:8081", } ) c.subscribe(["messages"]) while True: try: msg = c.poll(10) except SerializerError as e: print("Message deserialization failed for {}: {}".format(msg, e)) break if msg is None: continue if msg.error(): print("AvroConsumer error: {}".format(msg.error())) continue print(msg.value()) c.close()
- Подключиться к топику
-
Создайте Python-скрипт производителя.
Алгоритм работы скрипта:
- Подключиться к реестру схем и передать ему схемы форматов данных для ключа и значения.
- Сформировать на основе переданных схем ключ и значение.
- Отправить в топик
messages
сообщение, состоящее из парыключ:значение
. Номера версий схем будут добавлены в сообщение автоматически.
producer.py
#!/usr/bin/python3 from confluent_kafka import avro from confluent_kafka.avro import AvroProducer value_schema_str = """ { "namespace": "my.test", "name": "value", "type": "record", "fields": [ { "name": "name", "type": "string" } ] } """ key_schema_str = """ { "namespace": "my.test", "name": "key", "type": "record", "fields": [ { "name": "name", "type": "string" } ] } """ value_schema = avro.loads(value_schema_str) key_schema = avro.loads(key_schema_str) value = {"name": "Value"} key = {"name": "Key"} def delivery_report(err, msg): """Called once for each message produced to indicate delivery result. Triggered by poll() or flush().""" if err is not None: print("Message delivery failed: {}".format(err)) else: print("Message delivered to {} [{}]".format(msg.topic(), msg.partition())) avroProducer = AvroProducer( { "bootstrap.servers": ','.join([ "<FQDN_хоста-брокера_1>:9091", ... "<FQDN_хоста-брокера_N>:9091", ]), "security.protocol": "SASL_SSL", "ssl.ca.location": "/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt", "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": "user", "sasl.password": "<пароль_пользователя_user>", "on_delivery": delivery_report, "schema.registry.url": "http://<FQDN_или_IP-адрес_сервера_Schema_Registry>:8081", }, default_key_schema=key_schema, default_value_schema=value_schema, ) avroProducer.produce(topic="messages", key=key, value=value) avroProducer.flush()
Проверьте правильность работы Confluent Schema Registry
-
Запустите потребитель:
python3 ./consumer.py
-
В отдельном терминале запустите производитель:
python3 ./producer.py
-
Убедитесь, что данные, отправленные производителем, получены и правильно интерпретированы потребителем:
{'name': 'Value'}
Удалите созданные ресурсы
Удалите ресурсы, которые вы больше не будете использовать, чтобы за них не списывалась плата:
- Удалите кластер Managed Service for Apache Kafka®.
- Удалите виртуальную машину.
- Если вы зарезервировали публичные статические IP-адреса, освободите и удалите их