Получение данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®
- Перед началом работы
- Настройте интеграцию с Apache Kafka® для кластера Managed Service for ClickHouse®
- Создайте в кластере Managed Service for ClickHouse® таблицы на движке Kafka
- Отправьте тестовые данные в топики Managed Service for Apache Kafka®
- Проверьте наличие тестовых данных в таблицах кластера Managed Service for ClickHouse®
- Удалите созданные ресурсы
В кластер Managed Service for ClickHouse® можно в реальном времени поставлять данные из топиков Apache Kafka®. Эти данные будут автоматически вставлены в таблицы ClickHouse® на движке Kafka
Чтобы настроить поставку данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®:
- Настройте интеграцию с Apache Kafka® для кластера Managed Service for ClickHouse®.
- Создайте в кластере Managed Service for ClickHouse® таблицы на движке Kafka.
- Отправьте тестовые данные в топики Managed Service for Apache Kafka®.
- Проверьте наличие тестовых данных в таблицах кластера Managed Service for ClickHouse®.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
Подготовьте инфраструктуру
-
Создайте необходимое количество кластеров Managed Service for Apache Kafka® любой подходящей вам конфигурации. Для подключения к кластерам с локальной машины пользователя, а не из облачной сети Yandex Cloud, включите публичный доступ к кластерам при их создании.
-
Создайте кластер Managed Service for ClickHouse® с одним шардом и базой данных
db1
. Для подключения к кластеру с локальной машины пользователя, а не из облачной сети Yandex Cloud, включите публичный доступ к кластеру при его создании.Примечание
Интеграцию с Apache Kafka® можно настроить уже на этапе создания кластера. В этом практическом руководстве интеграция будет настроена позже.
-
Если вы используете группы безопасности, настройте их так, чтобы к кластерам можно было подключаться из интернета:
-
Создайте необходимое количество топиков в кластерах Managed Service for Apache Kafka®. Имена топиков не должны повторяться.
-
Чтобы производитель и потребитель могли работать с топиками, создайте в каждом кластере Managed Service for Apache Kafka® по два пользователя:
- пользователь с ролью
ACCESS_ROLE_PRODUCER
для производителя; - пользователь с ролью
ACCESS_ROLE_CONSUMER
для потребителя.
Имена пользователей в разных кластерах могут быть одинаковыми.
- пользователь с ролью
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации data-from-kafka-to-clickhouse.tf
.В этом файле описаны:
-
Сеть.
-
Подсеть.
-
Группа безопасности по умолчанию и правила, необходимые для подключения к кластерам из интернета.
-
Кластер Managed Service for Apache Kafka®.
-
Топик и два пользователя Managed Service for Apache Kafka®, от имени которых к топику будут подключаться производитель и потребитель, соответственно.
Для создания нескольких топиков или кластеров продублируйте блоки с их описанием и укажите новые уникальные имена. Имена пользователей в разных кластерах могут быть одинаковыми.
-
Кластер Managed Service for ClickHouse® с одним шардом и базой данных
db1
.
-
-
Укажите в файле
data-from-kafka-to-clickhouse.tf
:- версию Managed Service for Apache Kafka®;
- имена и пароли пользователей с ролями
ACCESS_ROLE_PRODUCER
иACCESS_ROLE_CONSUMER
кластеров Managed Service for Apache Kafka®; - имена топиков кластеров Managed Service for Apache Kafka®;
- имя пользователя и пароль, которые будут использоваться для доступа к кластеру Managed Service for ClickHouse®.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
Выполните дополнительные настройки
-
Установите утилиты:
-
kafkacat
— для чтения и записи данных в топики Apache Kafka®.sudo apt update && sudo apt install --yes kafkacat
Убедитесь, что можете с ее помощью подключиться к кластерам Managed Service for Apache Kafka® через SSL.
-
clickhouse-client
— для подключения к базе данных в кластере Managed Service for ClickHouse®.-
Подключите DEB-репозиторий
ClickHouse®:sudo apt update && sudo apt install --yes apt-transport-https ca-certificates dirmngr && \ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4 && \ echo "deb https://packages.clickhouse.com/deb stable main" | sudo tee \ /etc/apt/sources.list.d/clickhouse.list
-
Установите зависимости:
sudo apt update && sudo apt install --yes clickhouse-client
-
Загрузите файл конфигурации для
clickhouse-client
:mkdir -p ~/.clickhouse-client && \ wget "https://storage.yandexcloud.net/doc-files/clickhouse-client.conf.example" \ --output-document ~/.clickhouse-client/config.xml
Убедитесь, что можете с ее помощью подключиться к кластеру Managed Service for ClickHouse® через SSL.
-
-
jq
— для потоковой обработки JSON-файлов.sudo apt update && sudo apt-get install --yes jq
-
Настройте интеграцию с Apache Kafka® для кластера Managed Service for ClickHouse®
В зависимости от количества кластеров Managed Service for Apache Kafka®:
- Если кластер Apache Kafka® один, укажите данные для аутентификации в секции Настройки СУБД → Kafka. В этом случае кластер Managed Service for ClickHouse® будет использовать эти данные для аутентификации при обращении к любому топику.
- Если кластеров Apache Kafka® несколько, укажите данные для аутентификации каждого топика Managed Service for Apache Kafka® в настройках кластера Managed Service for ClickHouse® в секции Настройки СУБД → Kafka topics.
Данные для аутентификации:
- Name — имя топика (для нескольких кластеров Apache Kafka®).
- Sasl mechanism —
SCRAM-SHA-512
. - Sasl password — пароль пользователя для потребителя.
- Sasl username — имя пользователя для потребителя.
- Security protocol —
SASL_SSL
.
-
В зависимости от количества кластеров Managed Service for Apache Kafka®:
-
Если кластер Apache Kafka® один, раскомментируйте в файле
data-from-kafka-to-clickhouse.tf
блокclickhouse.config.kafka
:config { kafka { security_protocol = "SECURITY_PROTOCOL_SASL_SSL" sasl_mechanism = "SASL_MECHANISM_SCRAM_SHA_512" sasl_username = "<имя_пользователя_для_потребителя>" sasl_password = "<пароль_пользователя_для_потребителя>" } }
-
Если кластеров Apache Kafka® несколько, раскомментируйте блок
clickhouse.config.kafka_topic
, указав данные для аутентификации каждого топика Managed Service for Apache Kafka®:config { kafka_topic { name = "<имя_топика>" settings { security_protocol = "SECURITY_PROTOCOL_SASL_SSL" sasl_mechanism = "SASL_MECHANISM_SCRAM_SHA_512" sasl_username = "<имя_пользователя_для_потребителя>" sasl_password = "<пароль_пользователя_для_потребителя>" } } }
Если в кластерах несколько топиков, продублируйте блок
kafka_topic
необходимое количество раз, указав соответствующие имена топиков.
-
-
Проверьте корректность настроек.
-
В командной строке перейдите в каталог, в котором расположены актуальные конфигурационные файлы Terraform с планом инфраструктуры.
-
Выполните команду:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
-
Создайте в кластере Managed Service for ClickHouse® таблицы на движке Kafka
Пусть в топики Apache Kafka® поступают некоторые данные от сенсоров автомобиля в формате JSON. Эти данные будут передаваться в виде сообщений Apache Kafka®, каждое из которых будет содержать строчку вида:
{"device_id":"iv9a94th6rzt********","datetime":"2020-06-05 17:27:00","latitude":"55.70329032","longitude":"37.65472196","altitude":"427.5","speed":"0","battery_voltage":"23.5","cabin_temperature":"17","fuel_level":null}
Кластер Managed Service for ClickHouse® будет использовать при вставке в таблицы на движке Kafka
формат данных JSONEachRow
Для каждого из топиков Apache Kafka® создайте в кластере Managed Service for ClickHouse® отдельную таблицу, куда будут заноситься поступающие данные:
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse® с помощьюclickhouse-client
. -
Выполните запрос:
CREATE TABLE IF NOT EXISTS db1.<имя_таблицы_для_топика> ( device_id String, datetime DateTime, latitude Float32, longitude Float32, altitude Float32, speed Float32, battery_voltage Nullable(Float32), cabin_temperature Float32, fuel_level Nullable(Float32) ) ENGINE = Kafka() SETTINGS kafka_broker_list = '<FQDN_хоста-брокера>:9091', kafka_topic_list = '<имя_топика>', kafka_group_name = 'sample_group', kafka_format = 'JSONEachRow';
Созданные таблицы будут автоматически наполняться сообщениями, считываемыми из топиков Managed Service for Apache Kafka®. При чтении данных Managed Service for ClickHouse® использует указанные ранее настройки для пользователей с ролью ACCESS_ROLE_CONSUMER
.
Подробнее о создании таблиц на движке Kafka
см. в документации ClickHouse®
Отправьте тестовые данные в топики Managed Service for Apache Kafka®
-
Создайте файл
sample.json
с тестовыми данными:{ "device_id": "iv9a94th6rzt********", "datetime": "2020-06-05 17:27:00", "latitude": 55.70329032, "longitude": 37.65472196, "altitude": 427.5, "speed": 0, "battery_voltage": 23.5, "cabin_temperature": 17, "fuel_level": null } { "device_id": "rhibbh3y08qm********", "datetime": "2020-06-06 09:49:54", "latitude": 55.71294467, "longitude": 37.66542005, "altitude": 429.13, "speed": 55.5, "battery_voltage": null, "cabin_temperature": 18, "fuel_level": 32 } { "device_id": "iv9a94th6rzt********", "datetime": "2020-06-07 15:00:10", "latitude": 55.70985913, "longitude": 37.62141918, "altitude": 417.0, "speed": 15.7, "battery_voltage": 10.3, "cabin_temperature": 17, "fuel_level": null }
-
Отправьте данные из файла
sample.json
в каждый топик Managed Service for Apache Kafka® с помощьюjq
иkafkacat
:jq -rc . sample.json | kafkacat -P \ -b <FQDN_хоста-брокера>:9091 \ -t <имя_топика> \ -k key \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username="<имя_пользователя_для_производителя>" \ -X sasl.password="<пароль_пользователя_для_производителя>" \ -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/RootCA.crt -Z
Данные отправляются от имени пользователей с ролью ACCESS_ROLE_PRODUCER
. Подробнее о настройке SSL-сертификата и работе с kafkacat
см. в разделе Подключение к кластеру Apache Kafka® из приложений.
Проверьте наличие тестовых данных в таблицах кластера Managed Service for ClickHouse®
Для доступа к данным используйте материализованное представление. Когда к таблице на движке Kafka
присоединяется материализованное представление, оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от Apache Kafka® и преобразовывать их в необходимый формат с помощью SELECT
.
Примечание
Сообщение из топика может быть прочитано ClickHouse® только один раз, поэтому не рекомендуется считывать данные из таблицы напрямую.
Чтобы создать такое представление:
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse® с помощьюclickhouse-client
. -
Для каждой таблицы на движке
Kafka
выполните запросы:CREATE TABLE db1.temp_<имя_таблицы_для_топика> ( device_id String, datetime DateTime, latitude Float32, longitude Float32, altitude Float32, speed Float32, battery_voltage Nullable(Float32), cabin_temperature Float32, fuel_level Nullable(Float32) ) ENGINE = MergeTree() ORDER BY device_id;
CREATE MATERIALIZED VIEW db1.<имя_представления> TO db1.temp_<имя_таблицы_для_топика> AS SELECT * FROM db1.<имя_таблицы_для_топика>;
Чтобы получить все данные из нужного материализованного представления:
-
Подключитесь к базе данных
db1
кластера Managed Service for ClickHouse® с помощьюclickhouse-client
. -
Выполните запрос:
SELECT * FROM db1.<имя_представления>;
Запрос вернет таблицу с данными, отправленными в соответствующий топик Managed Service for Apache Kafka®.
Подробнее о работе с данными, поставляемыми из Apache Kafka®, см. в документации ClickHouse®
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
-
Удалите кластеры:
-
Если вы зарезервировали для кластеров публичные статические IP-адреса, освободите и удалите их.
Чтобы удалить инфраструктуру, созданную с помощью Terraform:
-
В терминале перейдите в директорию с планом инфраструктуры.
-
Удалите конфигурационный файл
data-from-kafka-to-clickhouse.tf
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Все ресурсы, которые были описаны в конфигурационном файле
data-from-kafka-to-clickhouse.tf
, будут удалены. -
ClickHouse® является зарегистрированным товарным знаком ClickHouse, Inc