Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Доступны в регионе
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Партнёрская программа
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ТОО «Облачные Сервисы Казахстан»
Yandex Data Streams
    • Все руководства
    • Ввод данных в системы хранения
    • Умная обработка логов
    • Передача данных в микросервисных архитектурах
    • Сохранение данных в ClickHouse®
    • Репликация логов в Object Storage с помощью Fluent Bit
    • Репликация логов в Object Storage с помощью Data Streams
    • Миграция данных в Yandex Object Storage с помощью Yandex Data Transfer
    • Поставка данных из Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Поставка данных из очереди Data Streams в Managed Service for YDB
    • Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Захват изменений YDB и поставка в YDS
    • Захват изменений PostgreSQL и поставка в YDS
    • Захват изменений MySQL® и поставка в YDS
    • Передача событий Yandex Cloud Postbox в Yandex Data Streams и их анализ с помощью Yandex DataLens
    • Создание интерактивного serverless-приложения с использованием WebSocket
    • Обработка аудитных логов Audit Trails
    • Обработка потока изменений Debezium
    • Загрузка аудитных логов в MaxPatrol SIEM
    • Поиск событий Yandex Cloud в Yandex Query
  • Управление доступом
  • Правила тарификации
  • Вопросы и ответы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Создайте поток данных приемника Data Streams
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Поставка данных из Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer

Поставка данных из Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer

Статья создана
Yandex Cloud
Обновлена 25 марта 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Создайте поток данных приемника Data Streams
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы

Важно

Этот документ не применим для пользователей Yandex Cloud в регионе Казахстан. См. полный перечень поддерживаемых эндпоинтов в Data Transfer.

В поток данных Data Streams можно в реальном времени поставлять данные из топиков Apache Kafka®.

Чтобы запустить поставку данных:

  1. Создайте поток данных приемника Data Streams.
  2. Подготовьте и активируйте трансфер.
  3. Проверьте работоспособность трансфера.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).

  • Плата за использование публичных IP-адресов, если для хостов кластера включен публичный доступ (см. тарифы Virtual Private Cloud).

  • Плата за базу данных Managed Service for YDB. Тарификация зависит от режима использования:

    • Для бессерверного режима — оплачиваются операции с данными и объем хранимых данных.
    • Для режима с выделенными инстансами — оплачивается использование вычислительных ресурсов, выделенных БД, и дискового пространства.

    Подробнее о тарифах Managed Service for YDB.

  • Плата за сервис Data Streams. Она зависит от режима тарификации:

    • По выделенным ресурсам — оплачивается количество единиц записываемых данных и ресурсы, выделенные для обслуживания потоков данных.
    • По фактическому использованию:
      • Если БД работает в бессерверном режиме, поток данных тарифицируется по правилам тарификации для бессерверного режима YDB.

      • Если БД работает в режиме с выделенными инстансами, поток данных отдельно не тарифицируется (оплачивается только БД, см. правила тарификации).

    Подробнее о тарифах Data Streams.

Перед началом работыПеред началом работы

  1. Подготовьте инфраструктуру поставки данных:

    Вручную
    Terraform
    1. Создайте кластер-источник Managed Service for Apache Kafka® любой подходящей конфигурации.
    2. Создайте базу данных Managed Service for YDB любой подходящей конфигурации.
    3. Создайте в кластере-источнике топик с именем sensors.
    4. Создайте в кластере-источнике пользователя с правами доступа ACCESS_ROLE_PRODUCER, ACCESS_ROLE_CONSUMER к созданному топику.
    1. Если у вас еще нет Terraform, установите его.

    2. Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.

    3. Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его.

    4. Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.

    5. Скачайте в ту же рабочую директорию файл конфигурации data-transfer-mkf-ydb.tf.

      В этом файле описаны:

      • сеть;
      • подсеть;
      • группа безопасности и правило, необходимое для подключения к кластеру Managed Service for Apache Kafka®;
      • кластер-источник Managed Service for Apache Kafka®;
      • топик Apache Kafka®;
      • пользователь Apache Kafka®;
      • база данных Managed Service for YDB;
      • трансфер.
    6. Укажите в файле data-transfer-mkf-ydb.tf переменные:

      • source_kf_version — версия Apache Kafka® в кластере-источнике;
      • source_user_name — имя пользователя для подключения к топику Apache Kafka®;
      • source_user_password — пароль пользователя;
      • target_db_name — имя базы данных Managed Service for YDB;
      • transfer_enabled — значение 0, чтобы не создавать трансфер до создания эндпоинтов вручную.
    7. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    8. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

      В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.

    В созданный топик Apache Kafka® sensors кластера-источника будут поступать тестовые данные от сенсоров автомобиля в формате JSON:

    {
        "device_id":"iv9a94th6rzt********",
        "datetime":"2020-06-05 17:27:00",
        "latitude":"55.70329032",
        "longitude":"37.65472196",
        "altitude":"427.5",
        "speed":"0",
        "battery_voltage":"23.5",
        "cabin_temperature":"17",
        "fuel_level":null
    }
    
  2. Установите утилиты:

    • kafkacat — для чтения и записи данных в топики Apache Kafka®.

      sudo apt update && sudo apt install --yes kafkacat
      

      Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.

    • jq — для потоковой обработки JSON-файлов.

      sudo apt update && sudo apt-get install --yes jq
      

Создайте поток данных приемника Data StreamsСоздайте поток данных приемника Data Streams

Создайте поток данных приемника Data Streams для базы данных Managed Service for YDB.

Подготовьте и активируйте трансферПодготовьте и активируйте трансфер

  1. Создайте эндпоинт для источника Apache Kafka®:

    Параметры эндпоинта:

    • Тип подключения — Кластер Managed Service for Apache Kafka.

      Выберите кластер-источник из списка и укажите настройки подключения к нему.

    • Расширенные настройки → Правила конвертации.

      • Правила конвертации – json.

      • Схема данных – Вы можете задать схему двумя способами:

        • Список полей.

          Задайте список полей топика вручную:

          Имя Тип Ключ
          device_id STRING Да
          datetime STRING
          latitude DOUBLE
          longitude DOUBLE
          altitude DOUBLE
          speed DOUBLE
          battery_voltage DOUBLE
          cabin_temperature UINT16
          fuel_level UINT16
        • JSON-спецификация.

          Создайте и загрузите файл схемы данных в формате JSON json_schema.json:

          json_schema.json
          [
              {
                  "name": "device_id",
                  "type": "string",
                  "key": true
              },
              {
                  "name": "datetime",
                  "type": "string"
              },
              {
                  "name": "latitude",
                  "type": "double"
              },
              {
                  "name": "longitude",
                  "type": "double"
              },
              {
                  "name": "altitude",
                  "type": "double"
              },
              {
                  "name": "speed",
                  "type": "double"
              },
              {
                  "name": "battery_voltage",
                  "type": "double"
              },
              {
                  "name": "cabin_temperature",
                  "type": "uint16"
              },
              {
                  "name": "fuel_level",
                  "type": "uint16"
              }
          ]
          
  2. Создайте эндпоинт для приемника Yandex Data Streams:

    Настройки подключения:

    • База данных — выберите базу данных Managed Service for YDB из списка.
    • Поток — укажите имя потока Data Streams.
    • Идентификатор сервисного аккаунта — выберите или создайте сервисный аккаунт с ролью yds.editor.
  3. Создайте трансфер:

    Вручную
    Terraform
    1. Создайте трансфер типа Репликация, использующий созданные эндпоинты.

      Если в процессе трансфера вы хотите преобразовать данные, укажите в настройках трансфера нужные трансформеры:

      • Переименование таблиц
      • Фильтр колонок
      • Маскирование данных
      • Разделение на подтаблицы
      • Замена первичных ключей
      • Преобразование значений колонок в строки
      • Шардирование
      • Фильтр строк для APPEND-ONLY источников
    2. Активируйте его.

    1. Укажите в файле data-transfer-mkf-ydb.tf переменные:

      • source_endpoint_id — значение идентификатора эндпоинта для источника;
      • target_endpoint_id — значение идентификатора эндпоинта для приемника;
      • transfer_enabled – значение 1 для создания трансфера.
    2. Если в процессе трансфера вы хотите преобразовать данные, добавьте в ресурс yandex_datatransfer_transfer блок transformation со списком нужных трансформеров:

      resource "yandex_datatransfer_transfer" "mkf-ydb-transfer" {
        ...
        transformation {
          transformers{
            <трансформер_1>
          }
          transformers{
            <трансформер_2>
          }
          ...
          transformers{
            <трансформер_N>
          }
        }
      }
      

      Доступны следующие типы трансформеров:

      • Переименование таблиц
      • Фильтр колонок
      • Маскирование данных
      • Разделение на подтаблицы
      • Замена первичных ключей
      • Преобразование значений колонок в строки
      • Шардирование
      • Фильтр строк для APPEND-ONLY источников

      Подробнее о настройке трансформеров см. в документации провайдера Terraform.

    3. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    4. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

      Трансфер активируется автоматически после создания.

Проверьте работоспособность трансфераПроверьте работоспособность трансфера

  1. Дождитесь перехода трансфера в статус Реплицируется.

  2. Убедитесь, что в поток данных Data Streams переносятся данные из топика кластера-источника Managed Service for Apache Kafka®:

    1. Создайте файл sample.json с тестовыми данными:

      {
          "device_id": "iv9a94th6rzt********",
          "datetime": "2020-06-05 17:27:00",
          "latitude": 55.70329032,
          "longitude": 37.65472196,
          "altitude": 427.5,
          "speed": 0,
          "battery_voltage": 23.5,
          "cabin_temperature": 17,
          "fuel_level": null
      }
      
      {
          "device_id": "rhibbh3y08qm********",
          "datetime": "2020-06-06 09:49:54",
          "latitude": 55.71294467,
          "longitude": 37.66542005,
          "altitude": 429.13,
          "speed": 55.5,
          "battery_voltage": null,
          "cabin_temperature": 18,
          "fuel_level": 32
      }
      
      {
          "device_id": "iv9a94th6r********",
          "datetime": "2020-06-07 15:00:10",
          "latitude": 55.70985913,
          "longitude": 37.62141918,
          "altitude": 417.0,
          "speed": 15.7,
          "battery_voltage": 10.3,
          "cabin_temperature": 17,
          "fuel_level": null
      }
      
    2. Отправьте данные из файла sample.json в топик sensors Managed Service for Apache Kafka® с помощью утилит jq и kafkacat:

      jq -rc . sample.json | kafkacat -P \
         -b <FQDN_хоста-брокера>:9091 \
         -t sensors \
         -k key \
         -X security.protocol=SASL_SSL \
         -X sasl.mechanisms=SCRAM-SHA-512 \
         -X sasl.username="<имя_пользователя_в_кластере-источнике>" \
         -X sasl.password="<пароль_пользователя_в_кластере-источнике>" \
         -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
      

      Данные отправляются от имени созданного пользователя. Подробнее о настройке SSL-сертификата и работе с kafkacat см. в разделе Подключение к кластеру Apache Kafka® из приложений.

    Проверьте, что в поток данных Data Streams перенеслись данные из источника:

    Консоль управления
    AWS CLI
    1. В консоли управления выберите сервис Data Streams.
    2. Выберите поток-приемник из списка и перейдите в раздел  Просмотр данных.
    3. Убедитесь, что в сегменте shard-000000 появились сообщения, содержащие строки таблицы из источника. Чтобы рассмотреть сообщения подробнее, нажмите на значок .
    1. Установите AWS CLI.

    2. Настройте окружение для Data Streams.

    3. Прочитайте данные из потока с помощью:

      • AWS CLI.
      • AWS SDK.

Удалите созданные ресурсыУдалите созданные ресурсы

Примечание

Перед тем как удалить созданные ресурсы, деактивируйте трансфер.

Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:

  1. Удалите трансфер.
  2. Удалите эндпоинты для источника и приемника.
  3. Если при создании эндпоинта для приемника вы создавали сервисный аккаунт, удалите его.

Остальные ресурсы удалите в зависимости от способа их создания:

Вручную
Terraform
  1. Удалите кластер Managed Service for Apache Kafka®.
  2. Удалите базу данных Managed Service for YDB.
  1. В терминале перейдите в директорию с планом инфраструктуры.

    Важно

    Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.

  2. Удалите ресурсы:

    1. Выполните команду:

      terraform destroy
      
    2. Подтвердите удаление ресурсов и дождитесь завершения операции.

    Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.

Была ли статья полезна?

Предыдущая
Миграция данных в Yandex Object Storage с помощью Yandex Data Transfer
Следующая
Поставка данных из очереди Data Streams в Managed Service for YDB
Проект Яндекса
© 2025 ТОО «Облачные Сервисы Казахстан»