Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Истории успеха
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Доступны в регионе
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • ИИ для бизнеса
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Партнёрская программа
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Истории успеха
  • Документация
  • Блог
Проект Яндекса
© 2025 ТОО «Облачные Сервисы Казахстан»
Практические руководства
    • Все руководства
    • Самостоятельное развертывание веб-интерфейса Apache Kafka®
    • Обновление кластера Managed Service for Apache Kafka® с ZooKeeper на KRaft
    • Миграция БД из стороннего кластера Apache Kafka® в Managed Service for Apache Kafka®
    • Перенос данных между кластерами Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for YDB в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Yandex MPP Analytics for PostgreSQL с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Yandex StoreDoc с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for MySQL® с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for OpenSearch с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for PostgreSQL с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Managed Service for Apache Kafka® в Data Streams с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for YDB с помощью Data Transfer
    • Поставка данных из Data Streams в Managed Service for Apache Kafka® с помощью Data Transfer
    • Захват изменений YDB и поставка в YDS
    • Настройка Kafka Connect для работы с кластером Managed Service for Apache Kafka®
    • Синхронизация топиков Apache Kafka® в Object Storage без использования интернета
    • Отслеживание потери сообщений в топике Apache Kafka®
    • Автоматизация задач Query с помощью Managed Service for Apache Airflow™
    • Отправка запросов к API Yandex Cloud через Yandex Cloud Python SDK
    • Настройка SMTP-сервера для отправки уведомлений по электронной почте
    • Добавление данных в БД ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® средствами ClickHouse®
    • Миграция данных в Managed Service for ClickHouse® при помощи Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for ClickHouse® с помощью Data Transfer
    • Асинхронная репликация данных из PostgreSQL в ClickHouse®
    • Обмен данными между Managed Service for ClickHouse® и Yandex Data Processing
    • Настройка Managed Service for ClickHouse® для Graphite
    • Получение данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®
    • Получение данных из Managed Service for Apache Kafka® в ksqlDB
    • Получение данных из RabbitMQ в Managed Service for ClickHouse®
    • Сохранение потока данных Data Streams в Managed Service for ClickHouse®
    • Асинхронная репликация данных из Яндекс Метрика в ClickHouse® с помощью Data Transfer
    • Использование гибридного хранилища в Managed Service for ClickHouse®
    • Шардирование таблиц Managed Service for ClickHouse®
    • Загрузка данных из Яндекс Директ в витрину Managed Service for ClickHouse® с использованием Cloud Functions, Object Storage и Data Transfer
    • Загрузка данных из Object Storage в Managed Service for ClickHouse® с помощью Data Transfer
    • Миграция данных со сменой хранилища из Managed Service for OpenSearch в Managed Service for ClickHouse® с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Managed Service for ClickHouse® с помощью Data Transfer
    • Интеграция Yandex Managed Service for ClickHouse® с Microsoft SQL Server через ClickHouse® JDBC Bridge
    • Миграция базы данных из Google BigQuery в Managed Service for ClickHouse®
    • Интеграция Yandex Managed Service for ClickHouse® с Oracle через ClickHouse® JDBC Bridge
    • Настройка Cloud DNS для доступа к кластеру Managed Service for ClickHouse® из других облачных сетей
    • Миграция кластера Yandex Data Processing с HDFS в другую зону доступности
    • Импорт данных из Managed Service for MySQL® в Yandex Data Processing с помощью Sqoop
    • Импорт данных из Managed Service for PostgreSQL в Yandex Data Processing с помощью Sqoop
    • Монтирование бакетов Object Storage к файловой системе хостов Yandex Data Processing
    • Работа с топиками Apache Kafka® с помощью Yandex Data Processing
    • Автоматизация работы с Yandex Data Processing с помощью Managed Service for Apache Airflow™
    • Совместная работа с таблицами Yandex Data Processing с использованием Apache Hive™ Metastore
    • Перенос метаданных между кластерами Yandex Data Processing с помощью Apache Hive™ Metastore
    • Импорт данных из Object Storage, обработка и экспорт в Managed Service for ClickHouse®
    • Миграция коллекций из стороннего кластера MongoDB в Yandex StoreDoc
    • Миграция данных в Yandex StoreDoc
    • Миграция кластера Yandex StoreDoc с версии 4.4 на 6.0
    • Шардирование коллекций Yandex StoreDoc
    • Анализ производительности и оптимизация Yandex StoreDoc
    • Анализ производительности и оптимизация Managed Service for MySQL®
    • Синхронизация данных из стороннего кластера MySQL® в Managed Service for MySQL® с помощью Data Transfer
    • Миграция БД из Managed Service for MySQL® в сторонний кластер MySQL®
    • Миграция БД из Managed Service for MySQL® в Object Storage с помощью Data Transfer
    • Перенос данных из Object Storage в Managed Service for MySQL® с использованием Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for MySQL® в Managed Service for Apache Kafka® с помощью Debezium
    • Миграция БД из Managed Service for MySQL® в Managed Service for YDB с помощью Data Transfer
    • Захват изменений MySQL® и поставка в YDS
    • Миграция данных из Managed Service for MySQL® в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из AWS RDS for PostgreSQL в Managed Service for PostgreSQL с помощью Data Transfer
    • Миграция данных из Managed Service for MySQL® в Yandex MPP Analytics for PostgreSQL с помощью Data Transfer
    • Настройка политики индексов в Managed Service for OpenSearch
    • Миграция данных в Managed Service for OpenSearch из стороннего кластера OpenSearch с помощью Data Transfer
    • Загрузка данных из Managed Service for OpenSearch в Object Storage с помощью Data Transfer
    • Миграция данных из Managed Service for OpenSearch в Managed Service for YDB с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Yandex MPP Analytics for PostgreSQL с помощью Yandex Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Аутентификация в OpenSearch Dashboards кластера Managed Service for OpenSearch с помощью Keycloak
    • Использование плагина yandex-lemmer в Managed Service for OpenSearch
    • Создание кластера PostgreSQL для «1С:Предприятия»
    • Логическая репликация PostgreSQL
    • Миграция БД из стороннего кластера PostgreSQL в Managed Service for PostgreSQL
    • Миграция БД из Managed Service for PostgreSQL
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Data Transfer
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for Apache Kafka® с помощью Debezium
    • Поставка данных из Managed Service for PostgreSQL в Managed Service for YDB с помощью Data Transfer
    • Миграция БД из Managed Service for PostgreSQL в Object Storage
    • Перенос данных из Object Storage в Managed Service for PostgreSQL с использованием Data Transfer
    • Захват изменений PostgreSQL и поставка в YDS
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for MySQL® с помощью Data Transfer
    • Миграция данных из Managed Service for PostgreSQL в Managed Service for OpenSearch с помощью Data Transfer
    • Решение проблем с сортировкой строк в PostgreSQL после обновления glibc
    • Миграция БД из Greenplum® в ClickHouse®
    • Миграция БД из Greenplum® в PostgreSQL
    • Выгрузка данных Greenplum® в холодное хранилище Object Storage
    • Загрузка данных из Object Storage в Yandex MPP Analytics for PostgreSQL с помощью Data Transfer
    • Копирование данных из Managed Service for OpenSearch в Yandex MPP Analytics for PostgreSQL с помощью Yandex Data Transfer
    • Создание внешней таблицы на базе таблицы из бакета Object Storage с помощью конфигурационного файла
    • Получение данных из внешних источников с помощью именованных запросов в Greenplum®
    • Миграция БД из стороннего кластера Valkey™ в Yandex Managed Service for Valkey™
    • Использование кластера Yandex Managed Service for Valkey™ в качестве хранилища сессий PHP
    • Загрузка данных из Object Storage в Managed Service for YDB с помощью Data Transfer
    • Загрузка данных из Managed Service for YDB в Object Storage с помощью Data Transfer
    • Обработка аудитных логов Audit Trails
    • Обработка логов Cloud Logging
    • Обработка потока изменений Debezium
    • Анализ данных с помощью Jupyter
    • Обработка файлов детализации в сервисе Yandex Cloud Billing
    • Ввод данных в системы хранения
    • Умная обработка логов
    • Передача данных в микросервисных архитектурах
    • Миграция данных в Object Storage с помощью Data Transfer
    • Миграция данных из стороннего кластера Greenplum® или PostgreSQL в Yandex MPP Analytics for PostgreSQL с помощью Data Transfer
    • Миграция кластера Yandex StoreDoc
    • Миграция кластера MySQL®
    • Миграция на сторонний кластер MySQL®
    • Миграция кластера PostgreSQL
    • Создание реестра схем для поставки данных в формате Debezium CDC из Apache Kafka®
    • Автоматизация работы с помощью Yandex Managed Service for Apache Airflow™
    • Работа с таблицей в Object Storage из PySpark-задания
    • Интеграция Yandex Managed Service for Apache Spark™ и Apache Hive™ Metastore
    • Запуск PySpark-задания с помощью Yandex Managed Service for Apache Airflow™
    • Использование Yandex Object Storage в Yandex Managed Service for Apache Spark™

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте инфраструктуру
  • Выполните дополнительные настройки
  • Настройте интеграцию с Apache Kafka® для кластера Managed Service for ClickHouse®
  • Создайте в кластере Managed Service for ClickHouse® таблицы на движке Kafka
  • Отправьте тестовые данные в топики Managed Service for Apache Kafka®
  • Проверьте наличие тестовых данных в таблицах кластера Managed Service for ClickHouse®
  • Удалите созданные ресурсы
  1. Построение Data Platform
  2. Получение данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®

Получение данных из Yandex Managed Service for Apache Kafka® в Yandex Managed Service for ClickHouse®

Статья создана
Yandex Cloud
Улучшена
Dmitry A.
Обновлена 18 ноября 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
    • Подготовьте инфраструктуру
    • Выполните дополнительные настройки
  • Настройте интеграцию с Apache Kafka® для кластера Managed Service for ClickHouse®
  • Создайте в кластере Managed Service for ClickHouse® таблицы на движке Kafka
  • Отправьте тестовые данные в топики Managed Service for Apache Kafka®
  • Проверьте наличие тестовых данных в таблицах кластера Managed Service for ClickHouse®
  • Удалите созданные ресурсы

Примечание

В регионе Казахстан доступна только зона доступности kz1-a.

В кластер Managed Service for ClickHouse® можно в реальном времени поставлять данные из топиков Apache Kafka®. Эти данные будут автоматически вставлены в таблицы ClickHouse® на движке Kafka.

Чтобы настроить поставку данных из Managed Service for Apache Kafka® в Managed Service for ClickHouse®:

  1. Настройте интеграцию с Apache Kafka® для кластера Managed Service for ClickHouse®.
  2. Создайте в кластере Managed Service for ClickHouse® таблицы на движке Kafka.
  3. Отправьте тестовые данные в топики Managed Service for Apache Kafka®.
  4. Проверьте наличие тестовых данных в таблицах кластера Managed Service for ClickHouse®.

Если созданные ресурсы вам больше не нужны, удалите их.

Важно

В руководстве используется однохостовый кластер ClickHouse®. Если в вашем кластере больше одного хоста ClickHouse®, в SQL-запросах, представленных ниже, используйте распределенный запрос с подстановкой имени кластера: CREATE ... ON CLUSTER '{cluster}'. Также в запросах, где указан табличный движок MergeTree, используйте вместо него ReplicatedMergeTree.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластеры Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за кластер Managed Service for ClickHouse®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Managed Service for ClickHouse®).
  • Плата за использование публичных IP-адресов, если для хостов кластеров включен публичный доступ (см. тарифы Virtual Private Cloud).

Перед началом работыПеред началом работы

Подготовьте инфраструктуруПодготовьте инфраструктуру

Вручную
Terraform
  1. Создайте необходимое количество кластеров Managed Service for Apache Kafka® любой подходящей вам конфигурации. Для подключения к кластерам с локальной машины пользователя, а не из облачной сети Yandex Cloud, включите публичный доступ к кластерам при их создании.

  2. Создайте кластер Managed Service for ClickHouse® с одним шардом и базой данных db1. Для подключения к кластеру с локальной машины пользователя, а не из облачной сети Yandex Cloud, включите публичный доступ к кластеру при его создании.

    Примечание

    Интеграцию с Apache Kafka® можно настроить уже на этапе создания кластера. В этом практическом руководстве интеграция будет настроена позже.

  3. Если вы используете группы безопасности, настройте их так, чтобы к кластерам можно было подключаться из интернета:

    • Инструкция для Managed Service for Apache Kafka®.
    • Инструкция для Managed Service for ClickHouse®.
  4. Создайте необходимое количество топиков в кластерах Managed Service for Apache Kafka®. Имена топиков не должны повторяться.

  5. Чтобы производитель и потребитель могли работать с топиками, создайте в каждом кластере Managed Service for Apache Kafka® по два пользователя:

    • пользователь с ролью ACCESS_ROLE_PRODUCER для производителя;
    • пользователь с ролью ACCESS_ROLE_CONSUMER для потребителя.

    Имена пользователей в разных кластерах могут быть одинаковыми.

  1. Если у вас еще нет Terraform, установите его.

  2. Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.

  3. Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его.

  4. Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.

  5. Скачайте в ту же рабочую директорию файл конфигурации data-from-kafka-to-clickhouse.tf.

    В этом файле описаны:

    • Сеть.

    • Подсеть.

    • Группа безопасности по умолчанию и правила, необходимые для подключения к кластерам из интернета.

    • Кластер Managed Service for Apache Kafka®.

    • Топик и два пользователя Managed Service for Apache Kafka®, от имени которых к топику будут подключаться производитель и потребитель, соответственно.

      Для создания нескольких топиков или кластеров продублируйте блоки с их описанием и укажите новые уникальные имена. Имена пользователей в разных кластерах могут быть одинаковыми.

    • Кластер Managed Service for ClickHouse® с одним шардом и базой данных db1.

  6. Укажите в файле data-from-kafka-to-clickhouse.tf:

    • версию Managed Service for Apache Kafka®;
    • имена и пароли пользователей с ролями ACCESS_ROLE_PRODUCER и ACCESS_ROLE_CONSUMER кластеров Managed Service for Apache Kafka®;
    • имена топиков кластеров Managed Service for Apache Kafka®;
    • имя пользователя и пароль, которые будут использоваться для доступа к кластеру Managed Service for ClickHouse®.
  7. Проверьте корректность файлов конфигурации Terraform с помощью команды:

    terraform validate
    

    Если в файлах конфигурации есть ошибки, Terraform на них укажет.

  8. Создайте необходимую инфраструктуру:

    1. Выполните команду для просмотра планируемых изменений:

      terraform plan
      

      Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

    2. Если вас устраивают планируемые изменения, внесите их:

      1. Выполните команду:

        terraform apply
        
      2. Подтвердите изменение ресурсов.

      3. Дождитесь завершения операции.

    В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.

Выполните дополнительные настройкиВыполните дополнительные настройки

  1. Установите утилиты:

    • kafkacat — для чтения и записи данных в топики Apache Kafka®.

      sudo apt update && sudo apt install --yes kafkacat
      

      Убедитесь, что можете с ее помощью подключиться к кластерам Managed Service for Apache Kafka® через SSL.

    • clickhouse-client — для подключения к базе данных в кластере Managed Service for ClickHouse®.

      1. Подключите DEB-репозиторий ClickHouse®:

        sudo apt update && sudo apt install --yes apt-transport-https ca-certificates dirmngr && \
        sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4 && \
        echo "deb https://packages.clickhouse.com/deb stable main" | sudo tee \
        /etc/apt/sources.list.d/clickhouse.list
        
      2. Установите зависимости:

        sudo apt update && sudo apt install --yes clickhouse-client
        
      3. Загрузите файл конфигурации для clickhouse-client:

        mkdir -p ~/.clickhouse-client && \
        wget "https://storage.yandexcloud.net/doc-files/clickhouse-client.conf.example" \
          --output-document ~/.clickhouse-client/config.xml
        

      Убедитесь, что можете с ее помощью подключиться к кластеру Managed Service for ClickHouse® через SSL.

    • jq — для потоковой обработки JSON-файлов.

      sudo apt update && sudo apt-get install --yes jq
      

Настройте интеграцию с Apache Kafka® для кластера Managed Service for ClickHouse®Настройте интеграцию с Apache Kafka® для кластера Managed Service for ClickHouse®

Вручную
Terraform

В зависимости от количества кластеров Managed Service for Apache Kafka®:

  • Если кластер Apache Kafka® один, укажите данные для аутентификации в настройках ClickHouse® в секции Настройки СУБД → Kafka. В этом случае кластер Managed Service for ClickHouse® будет использовать эти данные для аутентификации при обращении к любому топику.

    Данные для аутентификации:

    • Sasl mechanism — SCRAM-SHA-512.
    • Sasl password — пароль пользователя для потребителя.
    • Sasl username — имя пользователя для потребителя.
    • Security protocol — SASL_SSL.
  • Если кластеров Apache Kafka® несколько, создайте нужное количество именованных коллекций с данными для аутентификации каждого топика Managed Service for Apache Kafka®:

    1. Подключитесь к базе данных db1 кластера Managed Service for ClickHouse® с помощью clickhouse-client.

    2. Выполните следующий запрос нужное количество раз, указав данные для аутентификации каждого топика:

      CREATE NAMED COLLECTION <имя_коллекции> AS
          kafka_broker_list = '<FQDN_хоста-брокера>:9091',
          kafka_topic_list = '<имя_топика>',
          kafka_group_name = 'sample_group',
          kafka_format = 'JSONEachRow'
          kafka_security_protocol = 'SASL_SSL',
          kafka_sasl_mechanism = 'SCRAM-SHA-512',
          kafka_sasl_username = '<имя_пользователя_для_потребителя>',
          kafka_sasl_password = '<пароль_пользователя_для_потребителя>';
      
  1. В зависимости от количества кластеров Managed Service for Apache Kafka®:

    • Если кластер Apache Kafka® один, раскомментируйте в файле data-from-kafka-to-clickhouse.tf блок clickhouse.config.kafka:

      config {
          kafka {
              security_protocol = "SECURITY_PROTOCOL_SASL_SSL"
              sasl_mechanism    = "SASL_MECHANISM_SCRAM_SHA_512"
              sasl_username     = "<имя_пользователя_для_потребителя>"
              sasl_password     = "<пароль_пользователя_для_потребителя>"
          }
      }
      
    • Если кластеров Apache Kafka® несколько, раскомментируйте блок clickhouse.config.kafka_topic, указав данные для аутентификации каждого топика Managed Service for Apache Kafka®:

      config {
          kafka_topic {
              name = "<имя_топика>"
              settings {
              security_protocol = "SECURITY_PROTOCOL_SASL_SSL"
              sasl_mechanism    = "SASL_MECHANISM_SCRAM_SHA_512"
              sasl_username     = "<имя_пользователя_для_потребителя>"
              sasl_password     = "<пароль_пользователя_для_потребителя>"
              }
          }
      }
      

      Если в кластерах несколько топиков, продублируйте блок kafka_topic необходимое количество раз, указав соответствующие имена топиков.

  2. Проверьте корректность настроек.

    1. В командной строке перейдите в каталог, в котором расположены актуальные конфигурационные файлы Terraform с планом инфраструктуры.

    2. Выполните команду:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

  3. Подтвердите изменение ресурсов.

    1. Выполните команду для просмотра планируемых изменений:

      terraform plan
      

      Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

    2. Если вас устраивают планируемые изменения, внесите их:

      1. Выполните команду:

        terraform apply
        
      2. Подтвердите изменение ресурсов.

      3. Дождитесь завершения операции.

Создайте в кластере Managed Service for ClickHouse® таблицы на движке KafkaСоздайте в кластере Managed Service for ClickHouse® таблицы на движке Kafka

Пусть в топики Apache Kafka® поступают некоторые данные от сенсоров автомобиля в формате JSON. Эти данные будут передаваться в виде сообщений Apache Kafka®, каждое из которых будет содержать строчку вида:

{"device_id":"iv9a94th6rzt********","datetime":"2020-06-05 17:27:00","latitude":"55.70329032","longitude":"37.65472196","altitude":"427.5","speed":"0","battery_voltage":"23.5","cabin_temperature":"17","fuel_level":null}

Кластер Managed Service for ClickHouse® будет использовать при вставке в таблицы на движке Kafka формат данных JSONEachRow, который преобразует строки из сообщения Apache Kafka® в нужные значения столбцов.

Для каждого из топиков Apache Kafka® создайте в кластере Managed Service for ClickHouse® отдельную таблицу, куда будут заноситься поступающие данные:

  1. Подключитесь к базе данных db1 кластера Managed Service for ClickHouse® с помощью clickhouse-client.

  2. Выполните запрос:

    Один кластер Apache Kafka®
    Несколько кластеров Apache Kafka®
    CREATE TABLE IF NOT EXISTS db1.<имя_таблицы_для_топика>
    (
        device_id String,
        datetime DateTime,
        latitude Float32,
        longitude Float32,
        altitude Float32,
        speed Float32,
        battery_voltage Nullable(Float32),
        cabin_temperature Float32,
        fuel_level Nullable(Float32)
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = '<FQDN_хоста-брокера>:9091',
        kafka_topic_list = '<имя_топика>',
        kafka_group_name = 'sample_group',
        kafka_format = 'JSONEachRow';
    
    CREATE TABLE IF NOT EXISTS db1.<имя_таблицы_для_топика>
    (
        device_id String,
        datetime DateTime,
        latitude Float32,
        longitude Float32,
        altitude Float32,
        speed Float32,
        battery_voltage Nullable(Float32),
        cabin_temperature Float32,
        fuel_level Nullable(Float32)
    ) ENGINE = Kafka(<имя_коллекции_с_данными_аутентификации>);
    

Созданные таблицы будут автоматически наполняться сообщениями, считываемыми из топиков Managed Service for Apache Kafka®. При чтении данных Managed Service for ClickHouse® использует указанные ранее настройки для пользователей с ролью ACCESS_ROLE_CONSUMER.

Подробнее о создании таблиц на движке Kafka см. в документации ClickHouse®.

Отправьте тестовые данные в топики Managed Service for Apache Kafka®Отправьте тестовые данные в топики Managed Service for Apache Kafka®

  1. Создайте файл sample.json с тестовыми данными:

    {
        "device_id": "iv9a94th6rzt********",
        "datetime": "2020-06-05 17:27:00",
        "latitude": 55.70329032,
        "longitude": 37.65472196,
        "altitude": 427.5,
        "speed": 0,
        "battery_voltage": 23.5,
        "cabin_temperature": 17,
        "fuel_level": null
    }
    
    {
        "device_id": "rhibbh3y08qm********",
        "datetime": "2020-06-06 09:49:54",
        "latitude": 55.71294467,
        "longitude": 37.66542005,
        "altitude": 429.13,
        "speed": 55.5,
        "battery_voltage": null,
        "cabin_temperature": 18,
        "fuel_level": 32
    }
    
    {
        "device_id": "iv9a94th6rzt********",
        "datetime": "2020-06-07 15:00:10",
        "latitude": 55.70985913,
        "longitude": 37.62141918,
        "altitude": 417.0,
        "speed": 15.7,
        "battery_voltage": 10.3,
        "cabin_temperature": 17,
        "fuel_level": null
    }
    
  2. Отправьте данные из файла sample.json в каждый топик Managed Service for Apache Kafka® с помощью jq и kafkacat:

    jq -rc . sample.json | kafkacat -P \
       -b <FQDN_хоста-брокера>:9091 \
       -t <имя_топика> \
       -k key \
       -X security.protocol=SASL_SSL \
       -X sasl.mechanisms=SCRAM-SHA-512 \
       -X sasl.username="<имя_пользователя_для_производителя>" \
       -X sasl.password="<пароль_пользователя_для_производителя>" \
       -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/RootCA.crt -Z
    

Данные отправляются от имени пользователей с ролью ACCESS_ROLE_PRODUCER. Подробнее о настройке SSL-сертификата и работе с kafkacat см. в разделе Подключение к кластеру Apache Kafka® из приложений.

Проверьте наличие тестовых данных в таблицах кластера Managed Service for ClickHouse®Проверьте наличие тестовых данных в таблицах кластера Managed Service for ClickHouse®

Для доступа к данным используйте материализованное представление. Когда к таблице на движке Kafka присоединяется материализованное представление, оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от Apache Kafka® и преобразовывать их в необходимый формат с помощью SELECT.

Примечание

Сообщение из топика может быть прочитано ClickHouse® только один раз, поэтому не рекомендуется считывать данные из таблицы напрямую.

Чтобы создать такое представление:

  1. Подключитесь к базе данных db1 кластера Managed Service for ClickHouse® с помощью clickhouse-client.

  2. Для каждой таблицы на движке Kafka выполните запросы:

    CREATE TABLE db1.temp_<имя_таблицы_для_топика>
    (
        device_id String,
        datetime DateTime,
        latitude Float32,
        longitude Float32,
        altitude Float32,
        speed Float32,
        battery_voltage Nullable(Float32),
        cabin_temperature Float32,
        fuel_level Nullable(Float32)
    ) ENGINE = MergeTree()
    ORDER BY device_id;
    
    CREATE MATERIALIZED VIEW db1.<имя_представления> TO db1.temp_<имя_таблицы_для_топика>
        AS SELECT * FROM db1.<имя_таблицы_для_топика>;
    

Чтобы получить все данные из нужного материализованного представления:

  1. Подключитесь к базе данных db1 кластера Managed Service for ClickHouse® с помощью clickhouse-client.

  2. Выполните запрос:

    SELECT * FROM db1.<имя_представления>;
    

Запрос вернет таблицу с данными, отправленными в соответствующий топик Managed Service for Apache Kafka®.

Подробнее о работе с данными, поставляемыми из Apache Kafka®, см. в документации ClickHouse®.

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:

Вручную
Terraform
  • Удалите кластеры:

    • Yandex Managed Service for ClickHouse®;
    • Yandex Managed Service for Apache Kafka®.
  • Если вы зарезервировали для кластеров публичные статические IP-адреса, освободите и удалите их.

  1. В терминале перейдите в директорию с планом инфраструктуры.

    Важно

    Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.

  2. Удалите ресурсы:

    1. Выполните команду:

      terraform destroy
      
    2. Подтвердите удаление ресурсов и дождитесь завершения операции.

    Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.

ClickHouse® является зарегистрированным товарным знаком ClickHouse, Inc.

Была ли статья полезна?

Предыдущая
Настройка Managed Service for ClickHouse® для Graphite
Следующая
Получение данных из Managed Service for Apache Kafka® в ksqlDB
Проект Яндекса
© 2025 ТОО «Облачные Сервисы Казахстан»