Использование Managed Schema Registry с Yandex Managed Service for Apache Kafka®
Чтобы использовать Managed Schema Registry совместно с Managed Service for Apache Kafka®:
- Создайте скрипты производителя и потребителя на локальной машине.
- Проверьте правильность работы Managed Schema Registry.
- Удалите созданные ресурсы.
В этом руководстве описана регистрация одной схемы данных. Подробнее о том, как зарегистрировать несколько схем данных, см. в документации Confluent Schema Registry
Перед началом работы
-
Создайте кластер Managed Service for Apache Kafka® любой подходящей конфигурации. При создании кластера включите опции Реестр схем данных и Публичный доступ.
- Создайте топик с именем
messages
для обмена сообщениями между производителем и потребителем. - Создайте пользователя с именем
user
и выдайте ему права на топикmessages
:ACCESS_ROLE_CONSUMER
,ACCESS_ROLE_PRODUCER
.
- Создайте топик с именем
-
В той же сети, что и кластер Managed Service for Apache Kafka®, создайте виртуальную машину с Ubuntu 20.04 и публичным IP-адресом.
-
Если вы используете группы безопасности, настройте их так, чтобы был разрешен весь необходимый трафик между кластером Managed Service for Apache Kafka® и виртуальной машиной.
Создайте скрипты производителя и потребителя
Приведенные скрипты отправляют и принимают сообщения в топике messages
в виде пары ключ:значение
. В качестве примера схемы форматов данных описаны в формате Avro
Примечание
Скрипты на Python приведены в демонстрационных целях. Вы можете подготовить и передать схемы форматов данных и сами данные, создав аналогичный скрипт на другом языке программирования.
-
Подключитесь к виртуальной машине по SSH.
-
Установите необходимые пакеты Python:
sudo apt-get update && \ sudo pip3 install avro confluent_kafka
-
Чтобы использовать шифрованное соединение, установите SSL-сертификат:
sudo mkdir -p /usr/share/ca-certificates && \ sudo wget "https://storage.yandexcloud.net/cloud-certs/CA.pem" \ -O /usr/share/ca-certificates/YandexInternalRootCA.crt && \ sudo chmod 655 /usr/share/ca-certificates/YandexInternalRootCA.crt
-
Создайте Python-скрипт потребителя.
Алгоритм работы скрипта:
- Подключиться к топику
messages
и реестру схем Confluent Schema Registry. - В непрерывном цикле считывать поступающие в топик
messages
сообщения. - При получении сообщения запросить в реестре схем Confluent Schema Registry нужные схемы для разбора сообщения.
- Разобрать бинарные данные из сообщения в соответствии со схемами для ключа и значения и вывести результат на экран.
consumer.py
#!/usr/bin/python3 from confluent_kafka.avro import AvroConsumer from confluent_kafka.avro.serializer import SerializerError c = AvroConsumer( { "bootstrap.servers": ','.join([ "<FQDN_хоста-брокера_1>:9091", ... "<FQDN_хоста-брокера_N>:9091", ]), "group.id": "avro-consumer", "security.protocol": "SASL_SSL", "ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt", "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": "user", "sasl.password": "<пароль_пользователя_user>", "schema.registry.url": "https://<FQDN_или_IP-адрес_сервера_Managed_Schema_Registry>:443", "schema.registry.basic.auth.credentials.source": "SASL_INHERIT", "schema.registry.ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt", "auto.offset.reset": "earliest" } ) c.subscribe(["messages"]) while True: try: msg = c.poll(10) except SerializerError as e: print("Message deserialization failed for {}: {}".format(msg, e)) break if msg is None: continue if msg.error(): print("AvroConsumer error: {}".format(msg.error())) continue print(msg.value()) c.close()
- Подключиться к топику
-
Создайте Python-скрипт производителя.
Алгоритм работы скрипта:
- Подключиться к реестру схем и передать ему схемы форматов данных для ключа и значения.
- Сформировать на основе переданных схем ключ и значение.
- Отправить в топик
messages
сообщение, состоящее из парыключ:значение
. Номера версий схем будут добавлены в сообщение автоматически.
producer.py
#!/usr/bin/python3 from confluent_kafka import avro from confluent_kafka.avro import AvroProducer value_schema_str = """ { "namespace": "my.test", "name": "value", "type": "record", "fields": [ { "name": "name", "type": "string" } ] } """ key_schema_str = """ { "namespace": "my.test", "name": "key", "type": "record", "fields": [ { "name": "name", "type": "string" } ] } """ value_schema = avro.loads(value_schema_str) key_schema = avro.loads(key_schema_str) value = {"name": "Value"} key = {"name": "Key"} def delivery_report(err, msg): """Called once for each message produced to indicate delivery result. Triggered by poll() or flush().""" if err is not None: print("Message delivery failed: {}".format(err)) else: print("Message delivered to {} [{}]".format(msg.topic(), msg.partition())) avroProducer = AvroProducer( { "bootstrap.servers": ','.join([ "<FQDN_хоста-брокера_1>:9091", ... "<FQDN_хоста-брокера_N>:9091", ]), "security.protocol": 'SASL_SSL', "ssl.ca.location": '/usr/share/ca-certificates/YandexInternalRootCA.crt', "sasl.mechanism": 'SCRAM-SHA-512', "sasl.username": 'user', "sasl.password": '<пароль_пользователя_user>', "on_delivery": delivery_report, "schema.registry.basic.auth.credentials.source": 'SASL_INHERIT', "schema.registry.url": 'https://<FQDN_или_IP-адрес_сервера_Managed_Schema_Registry>:443', "schema.registry.ssl.ca.location": "/usr/share/ca-certificates/YandexInternalRootCA.crt" }, default_key_schema=key_schema, default_value_schema=value_schema ) avroProducer.produce(topic="messages", key=key, value=value) avroProducer.flush()
Проверьте правильность работы Managed Schema Registry
-
Запустите потребитель:
python3 ./consumer.py
-
В отдельном терминале запустите производитель:
python3 ./producer.py
-
Убедитесь, что данные, отправленные производителем, получены и правильно интерпретированы потребителем:
{'name': 'Value'}
Удалите созданные ресурсы
Удалите ресурсы, которые вы больше не будете использовать, чтобы за них не списывалась плата:
- Удалите кластер Managed Service for Apache Kafka®.
- Удалите виртуальную машину.
- Если вы зарезервировали публичные статические IP-адреса, освободите и удалите их