Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Data Transfer
  • Доступные трансферы
  • Начало работы
    • Все инструкции
    • Подготовка к трансферу
      • Управление эндпоинтами
      • Миграция эндпоинтов в другую зону доступности
        • Источник
        • Приемник
    • Управление трансфером
    • Работа с базами данных во время трансфера
    • Мониторинг состояния трансфера
  • Решение проблем
  • Управление доступом
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • Обучающие курсы

В этой статье:

  • Сценарии передачи данных из Apache Kafka®
  • Подготовка базы данных источника
  • Настройка эндпоинта-источника Apache Kafka®
  • Кластер Managed Service for Apache Kafka®
  • Пользовательская инсталляция
  • Расширенные настройки
  • Настройка приемника данных
  1. Пошаговые инструкции
  2. Настройка эндпоинтов
  3. Apache Kafka®
  4. Источник

Передача данных из эндпоинта-источника Apache Kafka®

Статья создана
Yandex Cloud
Обновлена 10 апреля 2025 г.
  • Сценарии передачи данных из Apache Kafka®
  • Подготовка базы данных источника
  • Настройка эндпоинта-источника Apache Kafka®
    • Кластер Managed Service for Apache Kafka®
    • Пользовательская инсталляция
    • Расширенные настройки
  • Настройка приемника данных

С помощью сервиса Yandex Data Transfer вы можете переносить данные из очереди Apache Kafka® и реализовывать различные сценарии переноса, обработки и трансформации данных. Для реализации трансфера:

  1. Ознакомьтесь с возможными сценариями передачи данных.
  2. Подготовьте базу данных Apache Kafka® к трансферу.
  3. Настройте эндпоинт-источник в Yandex Data Transfer.
  4. Настройте один из поддерживаемых приемников данных.
  5. Создайте и запустите трансфер.
  6. Выполняйте необходимые действия по работе с базой и контролируйте трансфер.
  7. При возникновении проблем, воспользуйтесь готовыми решениями по их устранению.

Сценарии передачи данных из Apache Kafka®Сценарии передачи данных из Apache Kafka®

  1. Миграция — перенос данных из одного хранилища в другое. Часто это перенос базы из устаревших локальных баз в управляемые облачные.

    Отдельной задачей миграции является зеркалирование данных между очередями:

    • Зеркалирование Apache Kafka®.
  2. Поставка данных — процесс доставки произвольных данных в целевые хранилища. Процесс поставки включает извлечение данных из очереди и их десериализацию с последующей трансформацией данных в формат целевого хранилища.

    • Apache Kafka® в ClickHouse®;
    • Apache Kafka® в PostgreSQL;
    • Apache Kafka® в Greenplum®;
    • Apache Kafka® в MongoDB;
    • Apache Kafka® в MySQL®;
    • Apache Kafka® в OpenSearch;
    • Apache Kafka® в YDB;
    • Apache Kafka® в YDS.

Подробное описание возможных сценариев передачи данных в Yandex Data Transfer см. в разделе Практические руководства.

Подготовка базы данных источникаПодготовка базы данных источника

Managed Service for Apache Kafka®
Apache Kafka®

Создайте пользователя с ролью ACCESS_ROLE_CONSUMER на топике-источнике.

  1. Если вы не планируете использовать для подключения к внешнему кластеру сервис Cloud Interconnect или VPN, разрешите подключения к такому кластеру из интернета с IP-адресов, используемых сервисом Data Transfer.

    Подробнее о настройке сети для работы с внешними ресурсами см. в концепции.

  2. Настройте доступ к кластеру-источнику из Yandex Cloud.

  3. Настройте права доступа пользователя к нужному топику.

  4. Выдайте права READ консьюмер-группе, идентификатор которой совпадает с идентификатором трансфера.

    bin/kafka-acls --bootstrap-server localhost:9092 \
      --command-config adminclient-configs.conf \
      --add \
      --allow-principal User:username \
      --operation Read \
      --group <идентификатор_трансфера>
    
  5. (Опционально) Чтобы использовать авторизацию по логину и паролю, настройте SASL-аутентификацию.

Настройка эндпоинта-источника Apache Kafka®Настройка эндпоинта-источника Apache Kafka®

При создании или изменении эндпоинта вы можете задать:

  • Настройки подключения к кластеру Yandex Managed Service for Apache Kafka® или пользовательской инсталляции, в т. ч. на базе виртуальных машин Yandex Compute Cloud. Эти параметры обязательные.
  • Расширенные настройки.

Кластер Managed Service for Apache Kafka®Кластер Managed Service for Apache Kafka®

Важно

Для создания или редактирования эндпоинта управляемой базы данных вам потребуется роль managed-kafka.viewer или примитивная роль viewer, выданная на каталог кластера этой управляемой базы данных.

Подключение с указанием идентификатора кластера в Yandex Cloud.

Консоль управления
Terraform
API
  • Кластер Managed Service for Apache Kafka — выберите кластер, к которому необходимо подключиться.

  • Аутентификация — выберите тип: SASL или Без аутентификации.

    При выборе значения SASL:

    • Имя пользователя — укажите имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
    • Пароль — укажите пароль учетной записи.
  • Полное имя топика — укажите имя топика, к которому необходимо подключиться.

  • Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.

    Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.

  • Тип эндпоинта — kafka_source.
  • security_groups — группы безопасности для сетевого трафика.

    Правила групп безопасности применяются к трансферу. Они позволяют открыть сетевой доступ с ВМ трансфера к кластеру. Подробнее см. в разделе Сеть в Yandex Data Transfer.

    Группы безопасности должны принадлежать той же сети, в которой размещен кластер.

    Примечание

    В Terraform сеть для групп безопасности задавать не нужно.

  • connection.cluster_id — идентификатор кластера, к которому необходимо подключиться.

  • auth — метод аутентификации. Используется при подключении к хостам-брокерам:

    • sasl — аутентификация с помощью SASL.
    • no_auth — без аутентификации.
  • topic_names — список имен топиков, к которым необходимо подключиться.

Пример структуры конфигурационного файла:

resource "yandex_datatransfer_endpoint" "<имя_эндпоинта_в_Terraform>" {
  name = "<имя_эндпоинта>"
  settings {
    kafka_source {
      security_groups = ["<список_идентификаторов_групп_безопасности>"]
      connection {
        cluster_id = "<идентификатор_кластера>"
      }
      auth {
        <метод_аутентификации>
      }
      topic_names = ["<список_имен_топиков>"]
      <расширенные_настройки_эндпоинта>
    }
  }
}

Подробнее см. в документации провайдера Terraform.

  • securityGroups — группы безопасности для сетевого трафика, правила которых применятся к ВМ и кластерам без изменения их настроек. Подробнее см. в разделе Сеть в Yandex Data Transfer.

  • connection.clusterId — идентификатор кластера, к которому необходимо подключиться.

  • auth — метод аутентификации. Используется при подключении к хостам-брокерам:

    • sasl — аутентификация с помощью SASL. Необходимо указать следующие параметры:

      • user — имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
      • password.raw — пароль учетной записи (в текстовом виде).
      • mechanism — механизм хеширования.
    • noAuth — без аутентификации.

  • topicNames — список имен топиков, к которым необходимо подключиться.

Пользовательская инсталляцияПользовательская инсталляция

Подключение к кластеру Apache Kafka® с явным указанием сетевых адресов и портов хостов-брокеров.

Консоль управления
Terraform
API
  • URL-адреса брокеров — укажите IP-адреса или FQDN хостов-брокеров.

    Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:

    <IP-адрес_или_FQDN_хоста-брокера>:<номер_порта>
    
  • SSL — использовать шифрование для защиты соединения.

  • PEM-сертификат — если требуется шифрование передаваемых данных, например для соответствия требованиям PCI DSS, загрузите файл сертификата или добавьте его содержимое в текстовом виде.

    Важно

    Если не добавить сертификат, трансфер может завершиться ошибкой.

  • Сетевой интерфейс для эндпоинта — выберите или создайте подсеть в нужной зоне доступности. Трансфер будет использовать эту подсеть для доступа к источнику.

    Если значение в этом поле задано для обоих эндпоинтов, то обе подсети должны быть размещены в одной зоне доступности.

  • Аутентификация — выберите тип: SASL или Без аутентификации.

    При выборе значения SASL:

    • Пользователь — укажите имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
    • Пароль — укажите пароль учетной записи.
    • Механизм — выберите механизм хеширования: SHA 256 или SHA 512.
  • Полное имя топика — укажите имя топика, к которому необходимо подключиться.

  • Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.

    Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.

  • Тип эндпоинта — kafka_source.
  • security_groups — группы безопасности для сетевого трафика.

    Правила групп безопасности применяются к трансферу. Они позволяют открыть сетевой доступ с ВМ трансфера к хостам-брокерам. Подробнее см. в разделе Сеть в Yandex Data Transfer.

    Группы безопасности должны принадлежать той же сети, что и подсеть subnet_id, если она указана.

    Примечание

    В Terraform сеть для групп безопасности задавать не нужно.

  • connection.on_premise — параметры подключения к хостам-брокерам:

    • broker_urls — IP-адреса или FQDN хостов-брокеров.

      Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:

      <IP-адрес_или_FQDN_хоста-брокера>:<номер_порта>
      
    • tls_mode — параметры шифрования передаваемых данных, если оно требуется, например для соответствия требованиям PCI DSS.

      • disabled — отключено.
      • enabled — включено.
        • ca_certificate — сертификат CA.

          Важно

          Если не добавить сертификат, трансфер может завершиться ошибкой.

    • subnet_id — идентификатор подсети, в которой находятся хосты-брокеры. Трансфер будет использовать эту подсеть для доступа к ним.

  • auth — метод аутентификации. Используется при подключении к хостам-брокерам:

    • sasl — аутентификация с помощью SASL.
    • no_auth — без аутентификации.
  • topic_names — список имен топиков, к которым необходимо подключиться.

Пример структуры конфигурационного файла:

resource "yandex_datatransfer_endpoint" "<имя_эндпоинта_в_Terraform>" {
  name = "<имя_эндпоинта>"
  settings {
    kafka_source {
      security_groups = ["<список_идентификаторов_групп_безопасности>"]
      connection {
        on_premise {
          broker_urls = ["<список_IP-адресов_или_FQDN_хостов-брокеров>"]
          subnet_id  = "<идентификатор_подсети_c_хостами-брокерами>"
        }
      }
      auth = {
        <метод_аутентификации>
      }
      topic_names = ["<список_имен_топиков>"]
      <расширенные_настройки_эндпоинта>
    }
  }
}

Подробнее см. в документации провайдера Terraform.

  • securityGroups — группы безопасности для сетевого трафика, правила которых применятся к ВМ и кластерам без изменения их настроек. Подробнее см. в разделе Сеть в Yandex Data Transfer.

  • connection.onPremise — параметры подключения к хостам-брокерам:

    • brokerUrls — IP-адреса или FQDN хостов-брокеров.

      Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:

      <IP-адрес_или_FQDN_хоста-брокера>:<номер_порта>
      
    • tlsMode — параметры шифрования передаваемых данных, если оно требуется, например для соответствия требованиям PCI DSS.

      • disabled — отключено.
      • enabled — включено.
        • caCertificate — сертификат CA.

          Важно

          Если не добавить сертификат, трансфер может завершиться ошибкой.

    • subnetId — идентификатор подсети, в которой находятся хосты-брокеры. Трансфер будет использовать эту подсеть для доступа к ним.

  • auth — метод аутентификации. Используется при подключении к хостам-брокерам:

    • sasl — аутентификация с помощью SASL. Необходимо указать следующие параметры:

      • user — имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
      • password.raw — пароль учетной записи (в текстовом виде).
      • mechanism — механизм хеширования.
    • noAuth — без аутентификации.

  • topicNames — список имен топиков, к которым необходимо подключиться.

Расширенные настройкиРасширенные настройки

В расширенных настройках вы можете задать правила трансформации и правила конвертации. Данные обрабатываются в следующем порядке:

  1. Трансформация. Данные в формате JSON передаются функции Yandex Cloud Functions. В теле функции содержатся метаинформация и необработанные данные, которые переданы в очередь. С помощью функции данные обрабатываются и возвращаются в Data Transfer.

  2. Конвертация. Выполняется парсинг, с помощью которого данные подготавливаются для передачи приемнику.

Если не заданы правила трансформации, то парсинг применяется к необработанным данным из очереди. Если не заданы правила конвертации, то данные сразу передаются в приемник.

Консоль управления
Terraform
API
  • Правила трансформации:

    • Функция обработки — выберите одну из функций, созданных в сервисе Cloud Functions.

    • Сервисный аккаунт — выберите или создайте сервисный аккаунт, от имени которого будет запускаться функция обработки.

    • Количество попыток — укажите количество попыток вызова функции обработки.

    • Размер буфера для отправки — укажите размер буфера (в байтах), при заполнении которого данные будут переданы функции обработки.

      Максимальный размер буфера — 3,5 МБ. Подробнее об ограничениях, действующих при работе с функциями в сервисе Cloud Functions, читайте в соответствующем разделе.

    • Интервал отправки — укажите длительность интервала (в секундах), по истечении которого данные из потока должны быть переданы функции обработки.

      Примечание

      Если буфер заполнится или истечет интервал отправки, данные будут переданы функции обработки.

    • Таймаут вызова — укажите допустимое время ожидания ответа от функции обработки (в секундах).

    Важно

    Значения в полях Интервал отправки и Таймаут вызова указываются с постфиксом s, например, 10s.

  • Правила конвертации:

    • Формат данных — выберите один из доступных форматов:
      • JSON — формат JSON.

      • Парсер AuditTrails.v1 — формат логов сервиса Audit Trails.

      • Парсер CloudLogging — формат логов сервиса Cloud Logging.

      • Парсер Debezium CDC — Debezium CDC. Позволяет в настройках указать Confluent Schema Registry.

        Для формата JSON укажите:

        • Схема данных — задайте схему в виде списка полей или загрузите файл с описанием схемы в формате JSON.
        Пример задания схемы данных
        [
            {
                "name": "request",
                "type": "string"
              }
        ]
        
        • Использовать значение NULL в ключевых столбцах — выберите эту опцию, чтобы разрешить значение null в ключевых колонках.
        • Добавить неразмеченные столбцы — выберите эту опцию, чтобы поля, отсутствующие в схеме, попадали в колонку _rest.
        • Разэкранировать значения строк — выберите эту опцию, чтобы убрать кавычки из строковых переменных (если этого не сделать, значения строковых полей останутся без изменений).

        Для Debezium CDC укажите: URL для Schema Registry, способ аутентификации (с указанием логина и пароля пользователя в случае использования аутентификации) и CA-сертификат.

  • transformer — правила трансформации.

  • parser — правила конвертации, которые определяются одним из выбранных парсеров:

    • audit_trails_v1_parser — парсер для логов сервиса Audit Trails.

    • cloud_logging_parser — парсер для логов сервиса Cloud Logging.

    • json_parser или tskv_parser — парсеры JSON и TSKV, соответственно.

      Атрибуты, задающие параметры их работы, одинаковы для обоих парсеров:

      • data_schema — схема данных, представленная либо в виде JSON-объекта с описанием схемы, либо в виде списка полей:

        • fields — схема в виде JSON-объекта. Объект содержит в себе массив JSON-объектов, которые описывают отдельные колонки.
        • json_fields — схема в виде списка JSON-полей.
      • null_keys_allowed — задайте значение true для этого атрибута, чтобы разрешить значение null в ключевых колонках.

      • add_rest_column — задайте значение true для этого атрибута, чтобы поля, отсутствующие в схеме, попадали в колонку _rest.

      • unescape_string_values — задайте значение true для этого атрибута, чтобы убрать кавычки из строковых переменных (если этого не сделать, значения строковых полей останутся без изменений).

  • transformer — правила трансформации:

    • cloudFunction — идентификатор функции, созданной в сервисе Cloud Functions.

    • serviceAccountId — идентификатор сервисного аккаунта, от имени которого будет запускаться функция обработки.

    • numberOfRetries — количество попыток вызова функции обработки.

    • bufferSize — размер буфера (в байтах), при заполнении которого данные будут переданы функции обработки.

      Максимальный размер буфера — 3,5 МБ. Подробнее об ограничениях, действующих при работе с функциями в сервисе Cloud Functions, читайте в соответствующем разделе.

    • bufferFlushInterval — длительность интервала (в секундах), по истечении которого данные из потока должны быть переданы функции обработки.

      Примечание

      Если буфер заполнится или истечет интервал отправки, данные будут переданы функции обработки.

    • invocationTimeout — допустимое время ожидания ответа от функции обработки (в секундах).

    Важно

    Значения для bufferFlushInterval и invocationTimeout указываются с постфиксом s, например, 10s.

  • parser — правила конвертации, которые определяются одним из выбранных парсеров:

    • auditTrailsV1Parser — парсер для логов сервиса Audit Trails.

    • cloudLoggingParser — парсер для логов сервиса Cloud Logging.

    • jsonParser или tskvParser — парсеры JSON и TSKV, соответственно.

      Поля, задающие параметры их работы, одинаковы для обоих парсеров:

      • dataSchema — схема данных, представленная либо в виде JSON-объекта с описанием схемы, либо в виде списка полей:

        • fields — схема в виде JSON-объекта. Объект содержит в себе массив JSON-объектов, которые описывают отдельные колонки.

        • jsonFields — схема в виде списка JSON-полей.

      • nullKeysAllowed — задайте значение true для этого поля, чтобы разрешить значение null в ключевых колонках.

      • addRestColumn — задайте значение true для этого поля, чтобы поля, отсутствующие в схеме, попадали в колонку _rest.

      • unescapeStringValues — задайте значение true для этого поля, чтобы убрать кавычки из строковых переменных (если этого не сделать, значения строковых полей останутся без изменений).

Настройка приемника данныхНастройка приемника данных

Настройте один из поддерживаемых приемников данных:

  • PostgreSQL;
  • MySQL®;
  • MongoDB
  • ClickHouse®;
  • Greenplum®;
  • Yandex Managed Service for YDB;
  • Yandex Object Storage;
  • Apache Kafka®;
  • YDS;
  • Elasticsearch;
  • OpenSearch.

Полный список поддерживаемых источников и приемников в Yandex Data Transfer см. в разделе Доступные трансферы.

После настройки источника и приемника данных создайте и запустите трансфер.

Была ли статья полезна?

Предыдущая
Миграция эндпоинтов в другую зону доступности
Следующая
Приемник
Проект Яндекса
© 2025 ООО «Яндекс.Облако»