Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Managed Service for PostgreSQL
  • Начало работы
    • Все руководства
    • Создание кластера PostgreSQL для 1С
    • Создание кластера Linux-серверов «1С:Предприятия» с кластером Managed Service for PostgreSQL
    • Выгрузка базы данных в Yandex Data Processing
    • Поиск проблем с производительностью кластера
    • Анализ производительности и оптимизация
    • Настройка подключения из контейнера Serverless Containers
    • Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Поставка данных в Yandex Managed Service for YDB с помощью Yandex Data Transfer
    • Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Debezium
    • Захват изменений PostgreSQL и поставка в YDS
    • Поставка данных из Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer
    • Перенос данных из Yandex Object Storage с использованием Yandex Data Transfer
    • Настройка отказоустойчивой архитектуры в Yandex Cloud
    • Мониторинг состояния географически распределенных устройств
    • Запись логов балансировщика в PostgreSQL
    • Создание сервера MLFlow для логирования экспериментов и артефактов
    • Работа с данными с помощью Query
    • Федеративные запросы к данным с помощью Query
    • Решение проблем с сортировкой строк после обновления glibc
    • Запись данных с устройства в базу данных
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • История изменений
  • Обучающие курсы

В этой статье:

  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте кластер-источник
  • Подготовьте кластер-приемник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы
  • Дополнительные материалы
  1. Практические руководства
  2. Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer

Поставка данных в Yandex Managed Service for Apache Kafka® с помощью Yandex Data Transfer

Статья создана
Yandex Cloud
Улучшена
Dmitry A.
Обновлена 21 апреля 2025 г.
  • Необходимые платные ресурсы
  • Перед началом работы
  • Подготовьте кластер-источник
  • Подготовьте кластер-приемник
  • Подготовьте и активируйте трансфер
  • Проверьте работоспособность трансфера
  • Удалите созданные ресурсы
  • Дополнительные материалы

Вы можете отслеживать изменения данных в кластере-источнике Managed Service for PostgreSQL и отправлять их в кластер-приемник Managed Service for Apache Kafka® с помощью технологии Change Data Capture (CDC).

Чтобы настроить CDC с использованием сервиса Data Transfer:

  1. Подготовьте кластер-источник.
  2. Подготовьте кластер-приемник.
  3. Подготовьте и активируйте трансфер.
  4. Проверьте работоспособность трансфера.

Если созданные ресурсы вам больше не нужны, удалите их.

Необходимые платные ресурсыНеобходимые платные ресурсы

В стоимость поддержки описываемого решения входят:

  • Плата за кластер Managed Service for PostgreSQL: использование вычислительных ресурсов, выделенных хостам, и дискового пространства (см. тарифы Managed Service for PostgreSQL).
  • Плата за кластер Managed Service for Apache Kafka®: использование вычислительных ресурсов, выделенных хостам (в том числе хостам ZooKeeper), и дискового пространства (см. тарифы Apache Kafka®).
  • Плата за использование публичных IP-адресов для хостов кластеров (см. тарифы Virtual Private Cloud).
  • Плата за трансфер: использование вычислительных ресурсов и количество переданных строк данных (см. тарифы Data Transfer).

Перед началом работыПеред началом работы

  1. Создайте кластер-источник Managed Service for PostgreSQL любой подходящей конфигурации со следующими настройками:

    • с базой данных db1;
    • с пользователем pg-user;
    • с хостами в публичном доступе.
  2. Создайте кластер-приемник Managed Service for Apache Kafka® любой подходящей конфигурации с хостами в публичном доступе.

  3. Если вы используете группы безопасности, настройте их так, чтобы к кластерам можно было подключаться из интернета:

    • Инструкция для Managed Service for PostgreSQL.
    • Инструкция для Managed Service for Apache Kafka®.
  4. Установите на локальный компьютер утилиту kcat (kafkacat) и клиент командной строки PostgreSQL. Например, в Ubuntu 20.04 выполните команду:

    sudo apt update && sudo apt install kafkacat postgresql-client --yes
    

    Убедитесь, что можете с ее помощью подключиться к кластеру-источнику Managed Service for Apache Kafka® через SSL.

Подготовьте кластер-источникПодготовьте кластер-источник

  1. Чтобы сервис Data Transfer мог получать от кластера Managed Service for PostgreSQL уведомления об изменениях в данных, в кластере-источнике необходимо создать публикацию (publication). Чтобы пользователь pg-user мог создать публикацию, назначьте ему роль mdb_replication.

  2. Подключитесь к базе данных db1 от имени пользователя pg-user.

  3. Наполните базу тестовыми данными. В качестве примера используется простая таблица, содержащая информацию с некоторых датчиков автомобиля.

    Создайте таблицу:

    CREATE TABLE public.measurements (
        "device_id" text PRIMARY KEY NOT NULL,
        "datetime" timestamp NOT NULL,
        "latitude" real NOT NULL,
        "longitude" real NOT NULL,
        "altitude" real NOT NULL,
        "speed" real NOT NULL,
        "battery_voltage" real,
        "cabin_temperature" real NOT NULL,
        "fuel_level" real
    );
    

    Наполните таблицу данными:

    INSERT INTO public.measurements VALUES
        ('iv9a94th6rzt********', '2020-06-05 17:27:00', 55.70329032, 37.65472196,  427.5,    0, 23.5, 17, NULL),
        ('rhibbh3y08qm********', '2020-06-06 09:49:54', 55.71294467, 37.66542005, 429.13, 55.5, NULL, 18, 32),
        ('iv9a94th678t********', '2020-06-07 15:00:10', 55.70985913, 37.62141918,  417.0, 15.7, 10.3, 17, NULL);
    

Подготовьте кластер-приемникПодготовьте кластер-приемник

Настройки различаются в зависимости от используемого способа управления топиками. При этом имена топиков для данных конструируются по тому же принципу, что и в Debezium — <префикс_топика>.<имя_схемы>.<имя_таблицы>. В этом руководстве в качестве примера будет использоваться префикс cdc.

Интерфейсы Yandex Cloud
Admin API

Если управление топиками осуществляется с помощью стандартных интерфейсов Yandex Cloud (Консоль управления, CLI, Terraform, API):

  1. Создайте топик с именем cdc.public.measurements.

    Если необходимо отслеживать изменения в нескольких таблицах, создайте для каждой из них отдельный топик.

  2. Создайте пользователя с именем kafka-user и ролями ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER, действующими на созданные топики.

Если для управления топиками используется Kafka Admin API:

  1. Создайте пользователя-администратора с именем kafka-user.

  2. Помимо роли ACCESS_ROLE_ADMIN назначьте пользователю-администратору роли ACCESS_ROLE_CONSUMER и ACCESS_ROLE_PRODUCER на топики, имена которых начинаются с префикса cdc.

    Необходимые топики будут созданы автоматически при первом событии изменения в отслеживаемых таблицах кластера-источника. Такое решение может быть удобным для отслеживания изменений во множестве таблиц, однако, требует запас свободного места в хранилище кластера. Подробнее см. в разделе Хранилище в Managed Service for Apache Kafka®.

Подготовьте и активируйте трансферПодготовьте и активируйте трансфер

  1. Создайте эндпоинты.

    • Эндпоинт для источника:

      • Тип базы данных — PostgreSQL.
      • Параметры эндпоинта:
        • Настройки подключения — Кластер Managed Service for PostgreSQL.
        • Кластер Managed Service for PostgreSQL — выберите созданный ранее кластер Managed Service for PostgreSQL.
        • База данных — db1.
        • Пользователь — pg-user.
        • Пароль — укажите пароль пользователя pg-user.
        • Список включённых таблиц — public.measurements.
    • Эндпоинт для приемника:

      • Тип базы данных — Kafka.

      • Параметры эндпоинта:

        • Тип подключения — Кластер Managed Service for Apache Kafka.

          • Кластер Managed Service for Apache Kafka — выберите кластер-приемник.
          • Аутентификация — укажите данные созданного ранее пользователя kafka-user.
        • Топик — Полное имя топика.

        • Полное имя топика — cdc.public.measurements.

        Если необходимо отслеживать изменения в нескольких таблицах, заполните поля следующим образом:

        • Топик — Префикс топика.
        • Префикс топика — укажите префикс cdc, использованный при формировании имен топиков.
  2. Создайте трансфер со следующими настройками:

    • Эндпоинты:
      • Источник — созданный ранее эндпоинт для источника.
      • Приёмник — созданный ранее эндпоинт для приемника.
    • Тип трансфера — Репликация.
  3. Активируйте трансфер и дождитесь его перехода в статус Реплицируется.

Проверьте работоспособность трансфераПроверьте работоспособность трансфера

  1. В отдельном терминале запустите утилиту kafkacat в режиме потребителя:

    kafkacat \
        -C \
        -b <FQDN_хоста-брокера_1>:9091,...,<FQDN_хоста-брокера_N>:9091 \
        -t cdc.public.measurements \
        -X security.protocol=SASL_SSL \
        -X sasl.mechanisms=SCRAM-SHA-512 \
        -X sasl.username=kafka-user \
        -X sasl.password=<пароль> \
        -X ssl.ca.location=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.crt \
        -Z \
        -K:
    

    FQDN хостов-брокеров можно получить со списком хостов в кластере Managed Service for Apache Kafka®.

    Будет выведена схема формата данных таблицы public.measurements и данные о добавленных в нее ранее строках.

    Пример фрагмента сообщения
    {
      "payload": {
        "consumer":"dttuhfpp97l3********"
      },
      "schema": {
        "fields": [
          {
            "field": "consumer",
            "optional":false,
            "type":"string"
          }
        ],
        "name": "__data_transfer_stub.public.__consumer_keeper.Key",
        "optional":false,
        "type":"struct"
      }
    }:{
      "payload": {
        "after": {
          "consumer":"dttuhfpp97l3********l",
          "locked_by":"dttuhfpp97l3********-1",
          "locked_till":"2022-05-15T09:55:18Z"
        },
      "before": null,
      "op":"u",
      "source": {
        "connector":"postgresql",
        "db":"db1",
        "lsn":85865797008,
        "name":"__data_transfer_stub",
        "schema":"public",
        "snapshot":"false",
        "table":"__consumer_keeper",
        "ts_ms":1652608518883,
        "txId":245165,
        "version":"1.1.2.Final",
        "xmin":null
      },
    ...
    
  2. Подключитесь к кластеру-источнику и добавьте данные в таблицу measurements:

    INSERT INTO public.measurements VALUES ('iv7b74th678t********', '2020-06-08 17:45:00', 53.70987913, 36.62549834, 378.0, 20.5, 5.3, 20, NULL);
    
  3. Убедитесь, что в терминале с запущенной утилитой kafkacat отобразились сведения о добавленной строке.

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:

  1. Деактивируйте и удалите трансфер.

  2. Удалите эндпоинты.

  3. Удалите кластеры:

    • Managed Service for Apache Kafka®.
    • Managed Service for PostgreSQL.
  4. Если для доступа к хостам кластеров использовались статические публичные IP-адреса, освободите и удалите их.

Дополнительные материалыДополнительные материалы

Больше информации о сценариях поставок данных в вебинаре Yandex Cloud:

Смотреть видео на YouTube.

Была ли статья полезна?

Предыдущая
Настройка подключения из контейнера Serverless Containers
Следующая
Поставка данных в Yandex Managed Service for YDB с помощью Yandex Data Transfer
Проект Яндекса
© 2025 ООО «Яндекс.Облако»