Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Data Transfer
  • Доступные трансферы
  • Начало работы
    • Все инструкции
    • Подготовка к трансферу
      • Управление эндпоинтами
      • Миграция эндпоинтов в другую зону доступности
        • Источник
        • Приемник
    • Управление трансфером
    • Работа с базами данных во время трансфера
    • Мониторинг состояния трансфера
  • Решение проблем
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • Обучающие курсы

В этой статье:

  • Сценарии передачи данных в Apache Kafka®
  • Настройка источника данных
  • Настройка эндпоинта-приемника Apache Kafka®
  • Кластер Managed Service for Apache Kafka®
  • Пользовательская инсталляция
  • Настройки топика Apache Kafka®
  • Настройки сериализации
  • Дополнительные настройки
  1. Пошаговые инструкции
  2. Настройка эндпоинтов
  3. Apache Kafka®
  4. Приемник

Передача данных в эндпоинт-приемник Apache Kafka®

Статья создана
Yandex Cloud
Обновлена 4 апреля 2025 г.
  • Сценарии передачи данных в Apache Kafka®
  • Настройка источника данных
  • Настройка эндпоинта-приемника Apache Kafka®
    • Кластер Managed Service for Apache Kafka®
    • Пользовательская инсталляция
    • Настройки топика Apache Kafka®
    • Настройки сериализации
    • Дополнительные настройки

С помощью сервиса Yandex Data Transfer вы можете переносить данные в очередь Apache Kafka® и реализовывать различные сценарии обработки и трансформации данных. Для реализации трансфера:

  1. Ознакомьтесь с возможными сценариями передачи данных.
  2. Настройте один из поддерживаемых источников данных.
  3. Настройте эндпоинт-приемник в Yandex Data Transfer.
  4. Создайте и запустите трансфер.
  5. Выполняйте необходимые действия по работе с базой и контролируйте трансфер.
  6. При возникновении проблем, воспользуйтесь готовыми решениями по их устранению.

Сценарии передачи данных в Apache Kafka®Сценарии передачи данных в Apache Kafka®

  1. Миграция — перенос данных из одного хранилища в другое. Часто это перенос базы из устаревших локальных баз в управляемые облачные.

    Отдельной задачей миграции является зеркалирование данных между очередями:

    • Зеркалирование Apache Kafka®
  2. Захват изменений данных — это процесс отслеживания изменений в базе данных и поставка этих изменений потребителям. Применяется для приложений, которые чувствительны к изменению данных в реальном времени.

    • Захват изменений из MySQL® и поставка в Apache Kafka®;
    • Захват изменений YDB и поставка в Apache Kafka®;
    • Захват изменений из PostgreSQL и поставка в Apache Kafka®.
  3. Поставка данных — процесс доставки произвольных данных в целевые хранилища. Процесс поставки включает извлечение данных из очереди и их десериализацию с последующей трансформацией данных в формат целевого хранилища.

    • Поставка данных из очереди YDS в Apache Kafka®

Подробное описание возможных сценариев передачи данных в Yandex Data Transfer см. в разделе Практические руководства.

Настройка источника данныхНастройка источника данных

Настройте один из поддерживаемых источников данных:

  • PostgreSQL;
  • MySQL®;
  • Apache Kafka®;
  • Airbyte®;
  • YDS;
  • Managed Service for YDB;
  • Elasticsearch;
  • OpenSearch.

Полный список поддерживаемых источников и приемников в Yandex Data Transfer см. в разделе Доступные трансферы.

Настройка эндпоинта-приемника Apache Kafka®Настройка эндпоинта-приемника Apache Kafka®

При создании или изменении эндпоинта вы можете задать:

  • Настройки подключения к кластеру Yandex Managed Service for Apache Kafka® или пользовательской инсталляции и настройки сериализации, в т. ч. на базе виртуальных машин Yandex Compute Cloud. Это обязательные параметры.
  • Настройки топика Apache Kafka.

Кластер Managed Service for Apache Kafka®Кластер Managed Service for Apache Kafka®

Важно

Для создания или редактирования эндпоинта управляемой базы данных вам потребуется роль managed-kafka.viewer или примитивная роль viewer, выданная на каталог кластера этой управляемой базы данных.

Подключение с указанием идентификатора кластера в Yandex Cloud.

Консоль управления
Terraform
API
  • Кластер Managed Service for Apache Kafka — выберите кластер, к которому необходимо подключиться.

  • Аутентификация — выберите тип: SASL или Без аутентификации.

    При выборе значения SASL:

    • Имя пользователя — укажите имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
    • Пароль — укажите пароль учетной записи.
  • В блоке Топик выберите способ указания топика:

    • Полное имя топика — введите полное имя топика.

      Если вы не хотите разбивать поток событий на независимые очереди по таблицам, включите настройку Сохранять порядок транзакций.

    • Префикс топика — введите префикс топика.

  • Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.

    Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.

  • В блоке Настройки сериализации выберите тип сериализации. Подробнее см. в разделе Настройки сериализации.

  • При необходимости задайте Дополнительные настройки эндпоинта:

    • Укажите конфигурацию топиков в формате Имя конфигурации-Значение конфигурации.
    • Выберите тип сжатия.
  • Тип эндпоинта — kafka_target.
  • security_groups — группы безопасности для сетевого трафика.

    Правила групп безопасности применяются к трансферу. Они позволяют открыть сетевой доступ с ВМ трансфера к кластеру. Подробнее см. в разделе Сеть в Yandex Data Transfer.

    Группы безопасности должны принадлежать той же сети, в которой размещен кластер.

    Примечание

    В Terraform сеть для групп безопасности задавать не нужно.

  • connection.cluster_id — идентификатор кластера, к которому необходимо подключиться.

  • auth — метод аутентификации. Используется при подключении к хостам-брокерам:

    • sasl — аутентификация с помощью SASL.
    • no_auth — без аутентификации.

Пример структуры конфигурационного файла:

resource "yandex_datatransfer_endpoint" "<имя_эндпоинта_в_Terraform>" {
  name = "<имя_эндпоинта>"
  settings {
    kafka_target {
      security_groups = ["<список_идентификаторов_групп_безопасности>"]
      connection {
        cluster_id = "<идентификатор_кластера>"
      }
      auth {
        <метод_аутентификации>
      }
      <настройки_топика>
      <настройки_сериализации>
    }
  }
}

Подробнее см. в документации провайдера Terraform.

  • securityGroups — группы безопасности для сетевого трафика, правила которых применятся к ВМ и кластерам без изменения их настроек. Подробнее см. в разделе Сеть в Yandex Data Transfer.

  • connection.clusterId — идентификатор кластера, к которому необходимо подключиться.

  • auth — метод аутентификации. Используется при подключении к хостам-брокерам:

    • sasl — аутентификация с помощью SASL. Необходимо указать следующие параметры:

      • user — имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
      • password.raw — пароль учетной записи (в текстовом виде).
      • mechanism — механизм хеширования.
    • noAuth — без аутентификации.

Пользовательская инсталляцияПользовательская инсталляция

Подключение к кластеру Apache Kafka® с явным указанием сетевых адресов и портов хостов-брокеров.

Консоль управления
Terraform
API
  • URL-адреса брокеров — укажите IP-адреса или FQDN хостов-брокеров.

    Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:

    <IP-адрес_или_FQDN_хоста-брокера>:<номер_порта>
    
  • SSL — использовать шифрование для защиты соединения.

  • PEM-сертификат — если требуется шифрование передаваемых данных, например для соответствия требованиям PCI DSS, загрузите файл сертификата или добавьте его содержимое в текстовом виде.

    Важно

    Если не добавить сертификат, трансфер может завершиться ошибкой.

  • Сетевой интерфейс для эндпоинта — выберите или создайте подсеть в нужной зоне доступности. Трансфер будет использовать эту подсеть для доступа к приемнику.

Если значение в этом поле задано для обоих эндпоинтов, то обе подсети должны быть размещены в одной зоне доступности.

  • Аутентификация — выберите тип: SASL или Без аутентификации.

    При выборе значения SASL:

    • Пользователь — укажите имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
    • Пароль — укажите пароль учетной записи.
    • Механизм — выберите механизм хеширования: SHA 256 или SHA 512.
  • Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.

    Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.

  • Тип эндпоинта — kafka_target.
  • security_groups — группы безопасности для сетевого трафика.

    Правила групп безопасности применяются к трансферу. Они позволяют открыть сетевой доступ с ВМ трансфера к хостам-брокерам. Подробнее см. в разделе Сеть в Yandex Data Transfer.

    Группы безопасности должны принадлежать той же сети, что и подсеть subnet_id, если она указана.

    Примечание

    В Terraform сеть для групп безопасности задавать не нужно.

  • connection.on_premise — параметры подключения к хостам-брокерам:

    • broker_urls — IP-адреса или FQDN хостов-брокеров.

      Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:

      <IP-адрес_или_FQDN_хоста-брокера>:<номер_порта>
      
    • tls_mode — параметры шифрования передаваемых данных, если оно требуется, например для соответствия требованиям PCI DSS.

      • disabled — отключено.
      • enabled — включено.
        • ca_certificate — сертификат CA.

          Важно

          Если не добавить сертификат, трансфер может завершиться ошибкой.

    • subnet_id — идентификатор подсети, в которой находятся хосты-брокеры. Трансфер будет использовать эту подсеть для доступа к ним.

  • auth — метод аутентификации. Используется при подключении к хостам-брокерам:

    • sasl — аутентификация с помощью SASL.
    • no_auth — без аутентификации.

Пример структуры конфигурационного файла:

resource "yandex_datatransfer_endpoint" "<имя_эндпоинта_в_Terraform>" {
  name = "<имя_эндпоинта>"
  settings {
    kafka_target {
      security_groups = ["<список_идентификаторов_групп_безопасности>"]
      connection {
        on_premise {
          broker_urls = ["<список_IP-адресов_или_FQDN_хостов-брокеров>"]
          subnet_id  = "<идентификатор_подсети_c_хостами-брокерами>"
        }
      }
      auth = {
        <метод_аутентификации>
      }
      <настройки_топика>
      <настройки_сериализации>
    }
  }
}

Подробнее см. в документации провайдера Terraform.

  • securityGroups — группы безопасности для сетевого трафика, правила которых применятся к ВМ и кластерам без изменения их настроек. Подробнее см. в разделе Сеть в Yandex Data Transfer.

  • connection.onPremise — параметры подключения к хостам-брокерам:

    • brokerUrls — IP-адреса или FQDN хостов-брокеров.

      Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:

      <IP-адрес_или_FQDN_хоста-брокера>:<номер_порта>
      
    • tlsMode — параметры шифрования передаваемых данных, если оно требуется, например для соответствия требованиям PCI DSS.

      • disabled — отключено.
      • enabled — включено.
        • caCertificate — сертификат CA.

          Важно

          Если не добавить сертификат, трансфер может завершиться ошибкой.

    • subnetId — идентификатор подсети, в которой находятся хосты-брокеры. Трансфер будет использовать эту подсеть для доступа к ним.

  • auth — метод аутентификации. Используется при подключении к хостам-брокерам:

    • sasl — аутентификация с помощью SASL. Необходимо указать следующие параметры:

      • user — имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
      • password.raw — пароль учетной записи (в текстовом виде).
      • mechanism — механизм хеширования.
    • noAuth — без аутентификации.

Настройки топика Apache Kafka®Настройки топика Apache Kafka®

Консоль управления
Terraform
API
  • Топик:

    • Полное имя топика — укажите имя топика, в который будут отправляться сообщения. Выберите Сохранять порядок транзакций, чтобы не разбивать поток событий на независимые очереди по таблицам.

    • Префикс топика — укажите префикс топика, аналог настройки Debezium database.server.name. Сообщения будут отправляться в топик с именем <префикс_топика>.<схема>.<имя_таблицы>.

Укажите в блоке topic_settings одну из опций отправки сообщений в топик:

  • topic — укажите параметры в этом блоке, чтобы отправлять все сообщения в один топик:

    • topic_name — имя топика, в который будут отправляться сообщения.
    • save_tx_order — опция, позволяющая сохранять порядок транзакций. Укажите значение true, чтобы не разбивать поток событий на независимые очереди по таблицам.
  • topic_prefix — укажите префикс, чтобы отправлять сообщения в разные топики с заданным префиксом.

    Это аналог настройки Debezium database.server.name. Сообщения будут отправляться в топик с именем <префикс_топика>.<схема>.<имя_таблицы>.

Укажите в поле topicSettings одну из опций отправки сообщений в топик:

  • topic — укажите параметры в этом поле, чтобы отправлять все сообщения в один топик:

    • topicName — имя топика, в который будут отправляться сообщения.
    • saveTxOrder — опция, позволяющая сохранять порядок транзакций. Укажите значение true, чтобы не разбивать поток событий на независимые очереди по таблицам.
  • topicPrefix — укажите префикс, чтобы отправлять сообщения в разные топики с заданным префиксом.

    Это аналог настройки Debezium database.server.name. Сообщения будут отправляться в топик с именем <префикс_топика>.<схема>.<имя_таблицы>.

Yandex Data Transfer поддерживает CDC-режим для трансферов из баз данных PostgreSQL, MySQL® и YDB в Apache Kafka® и Yandex Data Streams. При этом данные в приемник попадают в формате Debezium. Подробнее о CDC-режиме см. в разделе Захват изменения данных.

Примечание

В YDB CDC-режим поддерживается, начиная с версии 22.5 и выше.

Настройки сериализацииНастройки сериализации

Консоль управления
Terraform
API
  • В блоке Настройки сериализации выберите тип сериализации:

    • Auto — автоматическая сериализация.

    • Debezium — сериализация по стандартам Debezium:

      • Выберите схему ключа сообщения (соответствует Debezium-параметру key.converter).
      • Выберите схему значения сообщения (соответствует Debezium-параметру value.converter).
      • При необходимости задайте Настройки сериализации Debezium в формате Параметр-Значение.

Если вы хотите использовать JSON-схемы в Yandex Schema Registry, сохраняя совместимость схем при добавлении и удалении опциональных полей, укажите следующие настройки:

  • Настройки сериализации — Debezium.

  • Чтобы использовать Schema Registry для ключей, выберите Схема ключа сообщения — JSON (посредством Schema Registry). Чтобы использовать Schema Registry для значений, выберите Схема значения сообщения — JSON (посредством Schema Registry).

    • URL — эндпоинт пространства имен Schema Registry. Вы можете скопировать эндпоинт из подсказки по подключению к пространству имен Schema Registry на вкладке Debezium, в параметре value.converter.schema.registry.url.

      Важно

      В пространстве имен должна быть выставлена Политика проверок совместимости для JSON — optional friendly.

    • Имя пользователя — api-key.

    • Пароль — значение API-ключа с ограниченной областью действия для подключения к Schema Registry. Чтобы получить значение:

      1. Создайте API-ключ с ограниченной областью действия и поместите его в локальную переменную SECRET:

        yc iam api-key create --folder-id <идентификатор_каталога> \
          --service-account-name <имя_сервисного_аккаунта_для_работы_со_Schema_Registry> \
          --scopes yc.schema-registry.schemas.manage \
          --expires-at '2030-01-01T00:00:00Z' >./api-key.yaml && \
        SECRET=`cat ./api-key.yaml | grep 'secret:' | awk '{print $2}'`
        
      2. Выведите в терминал значение переменной SECRET:

        echo $SECRET
        
      3. Скопируйте полученное значение и вставьте его в поле Пароль в окне создания эндпоинта.

  • В блоке Настройки сериализации Debezium:

    • Чтобы генерировать закрытую схему для ключей, добавьте параметр key.converter.dt.json.generate.closed.content.schema со значением true.
    • Чтобы генерировать закрытую схему для значений, добавьте параметр value.converter.dt.json.generate.closed.content.schema со значением true.

Укажите в блоке serializer выбранный тип сериализации:

  • serializer_auto.

  • serializer_json.

  • serializer_debezium.

    Для этого типа можно указать параметры сериализации Debezium, задав их в блоке serializer_debezium.serializer_parameters в виде пар key/value.

Если вы хотите использовать JSON-схемы в Yandex Schema Registry, сохраняя совместимость схем при добавлении и удалении опциональных полей, добавьте в конфигурационный файл блок serializer с описанием настроек сериализации. Чтобы генерировать закрытую схему для ключей, добавьте в блок serializer переменную key.converter.dt.json.generate.closed.content.schema со значением true. Чтобы генерировать закрытую схему для значений, добавьте в блок serializer переменную value.converter.dt.json.generate.closed.content.schema со значением true.

resource "yandex_datatransfer_endpoint" "<имя_эндпоинта_в_Terraform>" {
  ...
  settings {
    kafka_target {
      ...
      serializer {
        serializer_debezium {
          serializer_parameters {            
            key = "key.converter.dt.json.generate.closed.content.schema"
            value = "true"
          }    
          serializer_parameters {            
            key = "value.converter.dt.json.generate.closed.content.schema"
            value = "true"
          }
          serializer_parameters {
            key = "value.converter.schemas.enable"
            value = "true"
          }
          serializer_parameters {
            key = "value.converter.schema.registry.url"
            value = "<URL_пространства_имен_Schema_Registry>"
          }
          serializer_parameters {
            key = "value.converter.basic.auth.user.info"
            value = "api-key:<значение_API-ключа>"
          }
        }
      }
    }
  }
}

Где:

  • URL_пространства_имен_Schema_Registry — эндпоинт пространства имен Schema Registry. Вы можете скопировать эндпоинт из подсказки по подключению к пространству имен Schema Registry на вкладке Debezium, в параметре value.converter.schema.registry.url.

    Важно

    В пространстве имен должна быть выставлена Политика проверок совместимости для JSON — optional friendly.

  • значение_API-ключа — значение API-ключа с ограниченной областью действия для подключения к Schema Registry. Чтобы получить значение:

    1. Создайте API-ключ с ограниченной областью действия и поместите его в локальную переменную SECRET:

      yc iam api-key create --folder-id <идентификатор_каталога> \
        --service-account-name <имя_сервисного_аккаунта_для_работы_со_Schema_Registry> \
        --scopes yc.schema-registry.schemas.manage \
        --expires-at '2030-01-01T00:00:00Z' >./api-key.yaml && \
      SECRET=`cat ./api-key.yaml | grep 'secret:' | awk '{print $2}'`
      
    2. Выведите в терминал значение переменной SECRET:

      echo $SECRET
      
    3. Скопируйте полученное значение и вставьте его в параметр value в конфигурационном файле.

Укажите в блоке serializer выбранный тип сериализации:

  • serializerAuto.

  • serializerJson.

  • serializerDebezium.

    Для этого типа можно указать параметры сериализации Debezium, задав их в поле serializerDebezium.serializerParameters в виде пар key/value.

Если вы хотите использовать JSON-схемы в Yandex Schema Registry, сохраняя совместимость схем при добавлении и удалении опциональных полей, добавьте в тело запроса объект serializer с описанием настроек сериализации. Чтобы генерировать закрытую схему для ключей, добавьте в объект serializer переменную key.converter.dt.json.generate.closed.content.schema со значением true. Чтобы генерировать закрытую схему для значений, добавьте в объект serializer переменную value.converter.dt.json.generate.closed.content.schema со значением true.

"serializer": {
  "serializerDebezium": {
    "serializerParameters": [
      {
        "key": "converter.dt.json.generate.closed.content.schema",
        "value": "true"
      },
      {
        "key": "value.converter",
        "value": "io.confluent.connect.json.JsonSchemaConverter"
      },
      {
        "key": "value.converter.schemas.enable",
        "value": "true"
      },
      {
        "key": "value.converter.schema.registry.url",
        "value": "<URL_пространства_имен_Schema_Registry>"
      },
      {
        "key": "value.converter.basic.auth.user.info",
        "value": "api-key:<значение_API-ключа>"
      }
    ]
  }
}

Где:

  • URL_пространства_имен_Schema_Registry — эндпоинт пространства имен Schema Registry. Вы можете скопировать эндпоинт из подсказки по подключению к пространству имен Schema Registry на вкладке Debezium, в параметре value.converter.schema.registry.url.

    Важно

    В пространстве имен должна быть выставлена Политика проверок совместимости для JSON — optional friendly.

  • значение_API-ключа — значение API-ключа с ограниченной областью действия для подключения к Schema Registry. Чтобы получить значение:

    1. Создайте API-ключ с ограниченной областью действия и поместите его в локальную переменную SECRET:

      yc iam api-key create --folder-id <идентификатор_каталога> \
        --service-account-name <имя_сервисного_аккаунта_для_работы_со_Schema_Registry> \
        --scopes yc.schema-registry.schemas.manage \
        --expires-at '2030-01-01T00:00:00Z' >./api-key.yaml && \
      SECRET=`cat ./api-key.yaml | grep 'secret:' | awk '{print $2}'`
      
    2. Выведите в терминал значение переменной SECRET:

      echo $SECRET
      
    3. Скопируйте полученное значение и вставьте его в параметр value в конфигурационном файле.

Дополнительные настройкиДополнительные настройки

Консоль управления

Вы можете указать параметры конфигурации топика, которые будут применяться при создании новых топиков.

Укажите параметр и одно из его допустимых значений: например, cleanup.policy и compact.

После настройки источника и приемника данных создайте и запустите трансфер.

Была ли статья полезна?

Предыдущая
Источник
Следующая
Источник
Проект Яндекса
© 2025 ООО «Яндекс.Облако»