Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Managed Service for PostgreSQL
  • Начало работы
    • Все руководства
    • Создание кластера PostgreSQL для 1С
    • Создание кластера Linux-серверов «1С:Предприятия» с кластером Managed Service for PostgreSQL
    • Выгрузка базы данных в Yandex Data Processing
    • Поиск проблем с производительностью кластера
    • Анализ производительности и оптимизация
    • Настройка подключения из контейнера Serverless Containers
    • Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Поставка данных в Yandex Managed Service for YDB с помощью Yandex Data Transfer
    • Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Debezium
    • Захват изменений PostgreSQL и поставка в YDS
    • Поставка данных из Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Перенос данных из Yandex Object Storage с использованием Yandex Data Transfer
    • Настройка отказоустойчивой архитектуры в Yandex Cloud
    • Мониторинг состояния географически распределенных устройств
    • Запись логов балансировщика в PostgreSQL
    • Создание сервера MLFlow для логирования экспериментов и артефактов
    • Работа с данными с помощью Query
    • Федеративные запросы к данным с помощью Query
    • Решение проблем с сортировкой строк после обновления glibc
    • Запись данных с устройства в базу данных
  • Управление доступом
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • История изменений
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте тестовые данные
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Поставка данных из Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer

Поставка данных из Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer

Статья создана
Yandex Cloud
Обновлена 17 марта 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте тестовые данные
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы

Вы можете настроить перенос данных из топика Managed Service for Apache Kafka® в Managed Service for PostgreSQL с помощью сервиса Yandex Data Transfer. Для этого:

  1. Подготовьте тестовые данные.
  2. Подготовьте и активируйте трансфер.
  3. Проверьте работоспособность трансфера.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за кластер Managed Service for PostgreSQL: использование вычислительных ресурсов, выделенных хостам, и дискового пространства (см. тарифы Managed Service for PostgreSQL).
  • Плата за использование публичных IP-адресов для хостов кластера (см. тарифы Virtual Private Cloud).

Перед началом работыПеред началом работы

  1. Подготовьте инфраструктуру:

    Вручную
    Terraform
    1. Создайте кластер-источник Managed Service for Apache Kafka® в любой зоне доступности, любой подходящей конфигурации и с публичным доступом.

    2. Создайте в кластере-источнике топик с именем sensors.

    3. Создайте в кластере-источнике пользователя с именем mkf-user и правами доступа к созданному топику ACCESS_ROLE_PRODUCER и ACCESS_ROLE_CONSUMER.

    4. В той же зоне доступности создайте кластер-приемник Managed Service for PostgreSQL любой подходящей конфигурации с именем пользователя-администратора pg-user и хостами в публичном доступе.

    5. Убедитесь, что группы безопасности кластеров настроены правильно и допускают подключение к ним:

      • Managed Service for Apache Kafka®.
      • Managed Service for PostgreSQL.
    1. Если у вас еще нет Terraform, установите его.

    2. Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.

    3. Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его.

    4. Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.

    5. Скачайте в ту же рабочую директорию файл конфигурации kafka-postgresql.tf.

      В этом файле описаны:

      • сети и подсети для размещения кластеров;
      • группы безопасности для подключения к кластерам;
      • кластер-источник Managed Service for Apache Kafka®;
      • кластер-приемник Managed Service for PostgreSQL;
      • эндпоинт для приемника;
      • трансфер.
    6. Укажите в файле kafka-postgresql.tf:

      • Версии Apache Kafka® и PostgreSQL.
      • Пароли пользователей Apache Kafka® и PostgreSQL.
    7. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    8. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

      В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления.

  2. Установите утилиты:

    • kafkacat — для чтения и записи данных в топики Apache Kafka®.

      sudo apt update && sudo apt install --yes kafkacat
      

      Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.

    • jq — для потоковой обработки JSON-файлов.

      sudo apt update && sudo apt-get install --yes jq
      
      

Подготовьте тестовые данныеПодготовьте тестовые данные

Пусть в качестве сообщения в топик Apache Kafka® sensors кластера-источника поступают данные от сенсоров автомобиля в формате JSON.

Создайте локально файл sample.json с тестовыми данными:

sample.json
{
    "device_id": "iv9a94th6rzt********",
    "datetime": "2020-06-05 17:27:00",
    "latitude": 55.70329032,
    "longitude": 37.65472196,
    "altitude": 427.5,
    "speed": 0,
    "battery_voltage": 23.5,
    "cabin_temperature": 17,
    "fuel_level": null
}
{
    "device_id": "rhibbh3y08qm********",
    "datetime": "2020-06-06 09:49:54",
    "latitude": 55.71294467,
    "longitude": 37.66542005,
    "altitude": 429.13,
    "speed": 55.5,
    "battery_voltage": null,
    "cabin_temperature": 18,
    "fuel_level": 32
}
{
    "device_id": "iv9a94th6rzt********",
    "datetime": "2020-06-07 15:00:10",
    "latitude": 55.70985913,
    "longitude": 37.62141918,
    "altitude": 417.0,
    "speed": 15.7,
    "battery_voltage": 10.3,
    "cabin_temperature": 17,
    "fuel_level": null
}

Подготовьте и активируйте трансферПодготовьте и активируйте трансфер

  1. Создайте эндпоинт-источник типа Apache Kafka® и задайте для него:

    • Полное имя топика — sensors.
    • Правила конвертации типа json. В поле Схема данных выберите JSON-спецификация и в открывшуюся форму скопируйте следующую спецификацию полей:
    sensors-specification
    [
        {
            "name": "device_id",
            "type": "utf8",
            "key": true
        },
        {
            "name": "datetime",
            "type": "utf8"
        },
        {
            "name": "latitude",
            "type": "double"
        },
        {
            "name": "longitude",
            "type": "double"
        },
        {
            "name": "altitude",
            "type": "double"
        },
        {
            "name": "speed",
            "type": "double"
        },
        {
            "name": "battery_voltage",
            "type": "double"
        },
        {
            "name": "cabin_temperature",
            "type": "uint16"
        },
        {
            "name": "fuel_level",
            "type": "uint16"
        }
    ]
    
  2. Создайте эндпоинт-приемник и трансфер:

    Вручную
    Terraform
    1. Создайте эндпоинт-приемник типа PostgreSQL и укажите в нем параметры подключения к кластеру:

      • Тип инсталляции — Кластер Managed Service for PostgreSQL.
      • Кластер Managed Service for PostgreSQL — <имя_кластера-приемника_PostgreSQL> из выпадающего списка.
      • База данных — db1.
      • Пользователь — pg-user.
      • Пароль — <пароль_пользователя>.
    2. Создайте трансфер типа Репликация, использующий созданные эндпоинты.

    3. Активируйте трансфер и дождитесь его перехода в статус Реплицируется.

    1. Укажите в файле kafka-postgresql.tf переменные:

      • kf_source_endpoint_id — значение идентификатора эндпоинта для источника;
      • transfer_enabled — значение 1 для создания эндпоинта-приемника и трансфера.
    2. Проверьте корректность файлов конфигурации Terraform с помощью команды:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

    3. Создайте необходимую инфраструктуру:

      1. Выполните команду для просмотра планируемых изменений:

        terraform plan
        

        Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

      2. Если вас устраивают планируемые изменения, внесите их:

        1. Выполните команду:

          terraform apply
          
        2. Подтвердите изменение ресурсов.

        3. Дождитесь завершения операции.

    4. Трансфер активируется автоматически. Дождитесь его перехода в статус Реплицируется.

Проверьте работоспособность трансфераПроверьте работоспособность трансфера

Убедитесь, что в базу данных Managed Service for PostgreSQL переносятся данные из топика кластера-источника Managed Service for Apache Kafka®:

  1. Отправьте данные из файла sample.json в топик sensors Managed Service for Apache Kafka® с помощью утилит jq и kafkacat:

    jq -rc . sample.json | kafkacat -P \
        -b <FQDN_хоста-брокера>:9091 \
        -t sensors \
        -k key \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username="mkf-user" \
        -X sasl.password="<пароль_пользователя_в_кластере-источнике>" \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt -Z
    

    Подробнее о настройке SSL-сертификата и работе с kafkacat см. в разделе Подключение к кластеру Apache Kafka® из приложений.

  2. Проверьте, что в базу данных Managed Service for PostgreSQL перенеслись данные из кластера-источника Managed Service for Apache Kafka®:

    1. Подключитесь к базе данных Managed Service for PostgreSQL.

    2. Проверьте, что таблица sensors содержит отправленные данные:

      SELECT * FROM sensors;
      

Удалите созданные ресурсыУдалите созданные ресурсы

Примечание

Перед тем как удалить созданные ресурсы, деактивируйте трансфер.

Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:

  1. Удалите трансфер.

  2. Удалите эндпоинт-источник.

  3. Остальные ресурсы удалите в зависимости от способа их создания:

    Вручную
    Terraform
    • Эндпоинт-приемник.
    • Managed Service for Apache Kafka®.
    • Managed Service for PostgreSQL.
    1. В терминале перейдите в директорию с планом инфраструктуры.

      Важно

      Убедитесь, что в директории нет Terraform-манифестов с ресурсами, которые вы хотите сохранить. Terraform удаляет все ресурсы, которые были созданы с помощью манифестов в текущей директории.

    2. Удалите ресурсы:

      1. Выполните команду:

        terraform destroy
        
      2. Подтвердите удаление ресурсов и дождитесь завершения операции.

      Все ресурсы, которые были описаны в Terraform-манифестах, будут удалены.

Была ли статья полезна?

Предыдущая
Захват изменений PostgreSQL и поставка в YDS
Следующая
Перенос данных из Yandex Object Storage с использованием Yandex Data Transfer
Проект Яндекса
© 2025 ООО «Яндекс.Облако»