Yandex Cloud
Поиск
Связаться с экспертомПопробовать бесплатно
  • Кейсы
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
  • Marketplace
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Искусственный интеллект
    • Безопасность
    • Инструменты DevOps
    • Бессерверные вычисления
    • Управление ресурсами
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Калькулятор цен
    • Тарифы
    • Акции и free tier
  • Кейсы
  • Документация
  • Блог
Создавайте контент и получайте гранты!Готовы написать своё руководство? Участвуйте в контент-программе и получайте гранты на работу с облачными сервисами!
Подробнее о программе
Проект Яндекса
© 2026 ООО «Яндекс.Облако»
Yandex Managed Service for Apache Kafka®
  • Начало работы
    • Все инструкции
    • Управление топиками
    • Управление пользователями
    • Управление коннекторами
    • Веб-интерфейс Kafka UI для Apache Kafka®
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Yandex Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • История изменений
  • Вопросы и ответы
  • Обучающие курсы

В этой статье:

  • Получить список коннекторов
  • Получить детальную информацию о коннекторе
  • Создать коннектор
  • MirrorMaker
  • S3 Sink
  • Iceberg Sink
  • Изменить коннектор
  • Приостановить коннектор
  • Возобновить работу коннектора
  • Импортировать коннектор в Terraform
  • Удалить коннектор
  1. Пошаговые инструкции
  2. Управление коннекторами

Управление коннекторами

Статья создана
Yandex Cloud
Улучшена
Обновлена 26 мая 2026 г.
  • Получить список коннекторов
  • Получить детальную информацию о коннекторе
  • Создать коннектор
    • MirrorMaker
    • S3 Sink
    • Iceberg Sink
  • Изменить коннектор
  • Приостановить коннектор
  • Возобновить работу коннектора
  • Импортировать коннектор в Terraform
  • Удалить коннектор

Коннектор управляет процессом переноса топиков Apache Kafka® в другой кластер или другую систему хранения данных.

Вы можете:

  • получить список коннекторов;
  • получить детальную информацию о коннекторе;
  • создать коннектор нужного типа:
    • MirrorMaker;
    • S3 Sink;
    • Iceberg Sink.
  • изменить коннектор;
  • приостановить коннектор;
  • возобновить работу коннектора;
  • импортировать коннектор в Terraform;
  • удалить коннектор.

Получить список коннекторовПолучить список коннекторов

Консоль управления
CLI
REST API
gRPC API
  1. В консоли управления перейдите в нужный каталог.
  2. Перейдите в сервис Managed Service for Kafka.
  3. Выберите нужный кластер и перейдите на вкладку Коннекторы.

Если у вас еще нет интерфейса командной строки Yandex Cloud (CLI), установите и инициализируйте его.

По умолчанию используется каталог, указанный при создании профиля CLI. Чтобы изменить каталог по умолчанию, используйте команду yc config set folder-id <идентификатор_каталога>. Также для любой команды вы можете указать другой каталог с помощью параметров --folder-name или --folder-id. Если вы обращаетесь к ресурсу по имени, поиск будет выполнен в каталоге по умолчанию. Если вы обращаетесь к ресурсу по идентификатору, поиск будет выполнен глобально — во всех каталогах с учетом прав доступа.

Чтобы запросить список коннекторов кластера, выполните команду:

yc managed-kafka connector list --cluster-name=<имя_кластера>

Результат:

+--------------+-----------+
|     NAME     | TASKS MAX |
+--------------+-----------+
| connector559 |         1 |
| ...          |           |
+--------------+-----------+

Имя кластера можно получить со списком кластеров в каталоге.

  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Воспользуйтесь методом Connector.list и выполните запрос, например, с помощью cURL:

    curl \
      --request GET \
      --header "Authorization: Bearer $IAM_TOKEN" \
      --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/connectors'
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге.

  3. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Клонируйте репозиторий cloudapi:

    cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
    

    Далее предполагается, что содержимое репозитория находится в директории ~/cloudapi/.

  3. Воспользуйтесь вызовом ConnectorService/List и выполните запрос, например, с помощью gRPCurl:

    grpcurl \
      -format json \
      -import-path ~/cloudapi/ \
      -import-path ~/cloudapi/third_party/googleapis/ \
      -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
      -rpc-header "Authorization: Bearer $IAM_TOKEN" \
      -d '{
            "cluster_id": "<идентификатор_кластера>"
          }' \
      mdb.api.cloud.yandex.net:443 \
      yandex.cloud.mdb.kafka.v1.ConnectorService.List
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге.

  4. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

Получить детальную информацию о коннектореПолучить детальную информацию о коннекторе

Консоль управления
CLI
REST API
gRPC API
  1. В консоли управления перейдите в нужный каталог.
  2. Перейдите в сервис Managed Service for Kafka.
  3. Выберите нужный кластер и перейдите на вкладку Коннекторы.
  4. Нажмите на имя нужного коннектора.

Если у вас еще нет интерфейса командной строки Yandex Cloud (CLI), установите и инициализируйте его.

По умолчанию используется каталог, указанный при создании профиля CLI. Чтобы изменить каталог по умолчанию, используйте команду yc config set folder-id <идентификатор_каталога>. Также для любой команды вы можете указать другой каталог с помощью параметров --folder-name или --folder-id. Если вы обращаетесь к ресурсу по имени, поиск будет выполнен в каталоге по умолчанию. Если вы обращаетесь к ресурсу по идентификатору, поиск будет выполнен глобально — во всех каталогах с учетом прав доступа.

Чтобы получить детальную информацию о коннекторе, выполните команду:

yc managed-kafka connector get <имя_коннектора>\
   --cluster-name=<имя_кластера>

Результат:

name: connector785
tasks_max: "1"
cluster_id: c9qbkmoiimsl********
...

Имя коннектора можно запросить со списком коннекторов в кластере, имя кластера — со списком кластеров в каталоге.

  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Воспользуйтесь методом Connector.get и выполните запрос, например, с помощью cURL:

    curl \
      --request GET \
      --header "Authorization: Bearer $IAM_TOKEN" \
      --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/connectors/<имя_коннектора>'
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя коннектора — со списком коннекторов в кластере.

  3. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Клонируйте репозиторий cloudapi:

    cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
    

    Далее предполагается, что содержимое репозитория находится в директории ~/cloudapi/.

  3. Воспользуйтесь вызовом ConnectorService/Get и выполните запрос, например, с помощью gRPCurl:

    grpcurl \
      -format json \
      -import-path ~/cloudapi/ \
      -import-path ~/cloudapi/third_party/googleapis/ \
      -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
      -rpc-header "Authorization: Bearer $IAM_TOKEN" \
      -d '{
            "cluster_id": "<идентификатор_кластера>"
          }' \
      mdb.api.cloud.yandex.net:443 \
      yandex.cloud.mdb.kafka.v1.ConnectorService.Get
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге.

  4. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

Создать коннекторСоздать коннектор

Консоль управления
CLI
Terraform
REST API
gRPC API
  1. В консоли управления перейдите в нужный каталог.

  2. Перейдите в сервис Managed Service for Kafka.

  3. Выберите нужный кластер и перейдите на вкладку Коннекторы.

  4. Нажмите кнопку Создать коннектор.

  5. В блоке Базовые параметры укажите:

    • Имя коннектора.
    • Лимит задач — количество одновременно работающих процессов. Рекомендуется указывать не менее 2 для равномерного распределения нагрузки репликации.
  6. В блоке Дополнительные свойства укажите свойства коннектора в формате:

    <ключ>:<значение>
    

    При этом ключ может быть как простой строкой, так и содержать префикс, указывающий на принадлежность к источнику или приемнику (псевдоним кластера в конфигурации коннектора):

    <псевдоним_кластера>.<тело_ключа>:<значение>
    
  7. Выберите тип коннектора — MirrorMaker, S3 Sink или Iceberg Sink — и задайте его конфигурацию.

    Подробнее о поддерживаемых типах коннекторов в разделе Коннекторы.

  8. Нажмите кнопку Создать.

Если у вас еще нет интерфейса командной строки Yandex Cloud (CLI), установите и инициализируйте его.

По умолчанию используется каталог, указанный при создании профиля CLI. Чтобы изменить каталог по умолчанию, используйте команду yc config set folder-id <идентификатор_каталога>. Также для любой команды вы можете указать другой каталог с помощью параметров --folder-name или --folder-id. Если вы обращаетесь к ресурсу по имени, поиск будет выполнен в каталоге по умолчанию. Если вы обращаетесь к ресурсу по идентификатору, поиск будет выполнен глобально — во всех каталогах с учетом прав доступа.

Чтобы создать коннектор MirrorMaker:

  1. Посмотрите описание команды CLI для создания коннектора:

    yc managed-kafka connector-mirrormaker create --help
    
  2. Создайте коннектор:

    yc managed-kafka connector-mirrormaker create <имя_коннектора> \
       --cluster-name=<имя_кластера> \
       --direction=<направление_коннектора> \
       --tasks-max=<лимит_задач> \
       --properties=<дополнительные_свойства> \
       --replication-factor=<фактор_репликации> \
       --topics=<шаблон_для_топиков> \
       --this-cluster-alias=<префикс_для_обозначения_этого_кластера> \
       --external-cluster alias=<префикс_для_обозначения_внешнего_кластера>,`
                         `bootstrap-servers=<список_FQDN_хостов-брокеров>,`
                         `security-protocol=<протокол_безопасности>,`
                         `sasl-mechanism=<механизм_шифрования>,`
                         `sasl-username=<имя_пользователя>,`
                         `sasl-password=<пароль_пользователя>,`
                         `ssl-truststore-certificates=<сертификаты_в_формате_PEM>
    

    Как получить FQDN хоста-брокера, см. в инструкции.

    Имя кластера можно получить со списком кластеров в каталоге.

    Параметр --direction принимает значение:

    • egress — если текущий кластер является кластером-источником.
    • ingress — если текущий кластер является кластером-приемником.

Чтобы создать коннектор S3 Sink:

  1. Посмотрите описание команды CLI для создания коннектора:

    yc managed-kafka connector-s3-sink create --help
    
  2. Создайте коннектор:

    yc managed-kafka connector-s3-sink create <имя_коннектора> \
       --cluster-name=<имя_кластера> \
       --tasks-max=<лимит_задач> \
       --properties=<дополнительные_свойства> \
       --topics=<шаблон_для_топиков> \
       --file-compression-type=<кодек_сжатия> \
       --file-max-records=<максимальное_количество_сообщений_в_файле> \
       --bucket-name=<имя_бакета> \
       --access-key-id=<идентификатор_AWS-совместимого_статического_ключа> \
       --secret-access-key=<содержимое_AWS-совместимого_статического_ключа> \
       --storage-endpoint=<эндпоинт_S3-совместимого_хранилища> \
       --region=<регион_S3-совместимого_хранилища>
    

    Имя кластера можно получить со списком кластеров в каталоге.

Чтобы создать коннектор Iceberg Sink:

  1. Посмотрите описание команды CLI для создания коннектора:

    yc managed-kafka connector-iceberg-sink create --help
    
  2. Создайте коннектор:

    yc managed-kafka connector-iceberg-sink create <имя_коннектора> \
       --cluster-name=<имя_кластера> \
       --tasks-max=<лимит_задач> \
       --properties=<дополнительные_свойства> \
       --topics=<шаблон_для_топиков> \
       --file-compression-type=<кодек_сжатия> \
       --file-max-records=<максимальное_количество_сообщений_в_файле> \
       --bucket-name=<имя_бакета> \
       --access-key-id=<идентификатор_AWS-совместимого_статического_ключа> \
       --secret-access-key=<содержимое_AWS-совместимого_статического_ключа> \
       --storage-endpoint=<эндпоинт_S3-совместимого_хранилища> \
       --region=<регион_S3-совместимого_хранилища>
    

    Имя кластера можно получить со списком кластеров в каталоге.

  1. Ознакомьтесь со списком настроек коннекторов MirrorMaker и S3 Sink.

  2. Откройте актуальный конфигурационный файл Terraform с планом инфраструктуры.

    Как создать такой файл, описано в разделе Создание кластера Apache Kafka®.

  3. Чтобы создать коннектор MirrorMaker, добавьте ресурс yandex_mdb_kafka_connector с блоком настроек connector_config_mirrormaker:

    resource "yandex_mdb_kafka_connector" "<имя_коннектора>" {
      cluster_id = "<идентификатор_кластера>"
      name       = "<имя_коннектора>"
      tasks_max  = <лимит_задач>
      properties = {
        <дополнительные_свойства>
      }
      connector_config_mirrormaker {
        topics             = "<шаблон_для_топиков>"
        replication_factor = <фактор_репликации>
        source_cluster {
          alias = "<префикс_для_обозначения_кластера>"
          external_cluster {
            bootstrap_servers           = "<список_FQDN_хостов-брокеров>"
            sasl_username               = "<имя_пользователя>"
            sasl_password               = "<пароль_пользователя>"
            sasl_mechanism              = "<механизм_шифрования>"
            security_protocol           = "<протокол_безопасности>"
            ssl-truststore-certificates = "<содержимое_PEM-сертификата>"
          }
        }
        target_cluster {
          alias = "<префикс_для_обозначения_кластера>"
          this_cluster {}
        }
      }
    }
    

    Как получить FQDN хоста-брокера, см. в инструкции.

  4. Чтобы создать коннектор S3 Sink, добавьте ресурс yandex_mdb_kafka_connector с блоком настроек connector_config_s3_sink:

    resource "yandex_mdb_kafka_connector" "<имя_коннектора>" {
      cluster_id = "<идентификатор_кластера>"
      name       = "<имя_коннектора>"
      tasks_max  = <лимит_задач>
      properties = {
        <дополнительные_свойства>
      }
      connector_config_s3_sink {
        topics                = "<шаблон_для_топиков>"
        file_compression_type = "<кодек_сжатия>"
        file_max_records      = <максимальное_количество_сообщений_в_файле>
        s3_connection {
          bucket_name = "<имя_бакета>"
          external_s3 {
            endpoint          = "<эндпоинт_S3-совместимого_хранилища>"
            access_key_id     = "<идентификатор_AWS-совместимого_статического_ключа>"
            secret_access_key = "<содержимое_AWS-совместимого_статического_ключа>"
          }
        }
      }
    }
    
  5. Чтобы создать коннектор Iceberg Sink, добавьте ресурс yandex_mdb_kafka_connector с блоком настроек connector_config_iceberg_sink:

    resource "yandex_mdb_kafka_connector" "<имя_коннектора>" {
       cluster_id = "<идентификатор_кластера>"
       name       = "<имя_коннектора>"
       tasks_max  = <лимит_задач>
       properties = {
         <дополнительные_свойства>
       }
       connector_config_iceberg_sink {
         topics        = "<список_топиков_через_запятую>"
         control_topic = "<имя_топика_управления>"
    
         metastore_connection {
           catalog_uri = "<URI_для_подключения_к_кластеру_Metastore>"
           warehouse   = "<корневая_директория_для_хранения_данных_управляемых_таблиц_в_S3>"
         }
    
         s3_connection {
           external_s3 {
             endpoint          = "<эндпоинт_S3-совместимого_хранилища>"
             access_key_id     = "<идентификатор_AWS-совместимого_статического_ключа>"
             secret_access_key = "<содержимое_AWS-совместимого_статического_ключа>"
             region            = "<название_региона>"
          }
         }
    
         static_tables {
           tables = "имена_таблиц_через_запятую"
         }
    
         tables_config {
           default_commit_branch    = "<имя_ветки_по_умолчанию>"
           default_id_columns       = "<список_столбцов_по_умолчанию_через_запятую>"
           default_partition_by     = "<список_стобцов_или выражений_трансформации>"
           evolve_schema_enabled    = <автоматически_изменять_схему_Iceberg-таблицы>
           schema_force_optional    = <сделать_поля_схемы_Iceberg-таблицы_необязательными>
           schema_case_insensitive  = <игнорировать_регистр_при_сопоставлении_полей>
         }
    
         control_config {
           group_id_prefix      = "<префикс_для_Consumer_Group_ID>"
           commit_interval_ms   = <интервал_коммита_данных_в_Iceberg-таблицу>
           commit_timeout_ms    = <сколько_времени_координатор_ждет_подтверждения>
           commit_threads       = <количество_потоков_для_коммита_данных_в_Iceberg-таблицу>
           transactional_prefix = "<префикс_для_Transactional_ID>"
         }
       }
    }
    
  6. Проверьте корректность настроек.

    1. В командной строке перейдите в каталог, в котором расположены актуальные конфигурационные файлы Terraform с планом инфраструктуры.

    2. Выполните команду:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

  7. Подтвердите изменение ресурсов.

    1. Выполните команду для просмотра планируемых изменений:

      terraform plan
      

      Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

    2. Если вас устраивают планируемые изменения, внесите их:

      1. Выполните команду:

        terraform apply
        
      2. Подтвердите изменение ресурсов.

      3. Дождитесь завершения операции.

Подробнее в документации провайдера Terraform.

  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Чтобы создать коннектор MirrorMaker, воспользуйтесь методом Connector.create и выполните запрос, например, с помощью cURL:

    curl \
      --request POST \
      --header "Authorization: Bearer $IAM_TOKEN" \
      --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/connectors' \
      --data '{
                "connectorSpec": {
                  "name": "<имя_коннектора>",
                  "tasksMax": "<лимит_задач>"
                  "properties": "<дополнительные_свойства_коннектора>"
                  "connectorConfigMirrormaker": {
                    <настройки_коннектора_Mirrormaker>
                  }
                }
              }'
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге.

  3. Чтобы создать коннектор S3 Sink, воспользуйтесь методом Connector.create и выполните запрос, например, с помощью cURL:

    curl \
      --request POST \
      --header "Authorization: Bearer $IAM_TOKEN" \
      --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/connectors' \
      --data '{
                "connectorSpec": {
                  "name": "<имя_коннектора>",
                  "tasksMax": "<лимит_задач>"
                  "properties": "<дополнительные_свойства_коннектора>"
                  "connectorConfigS3Sink": {
                    <настройки_коннектора_S3-Sink>
                  }
                }
              }'
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге.

  4. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Клонируйте репозиторий cloudapi:

    cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
    

    Далее предполагается, что содержимое репозитория находится в директории ~/cloudapi/.

  3. Чтобы создать коннектор MirrorMaker, воспользуйтесь вызовом ConnectorService/Create и выполните запрос, например, с помощью gRPCurl:

    grpcurl \
      -format json \
      -import-path ~/cloudapi/ \
      -import-path ~/cloudapi/third_party/googleapis/ \
      -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
      -rpc-header "Authorization: Bearer $IAM_TOKEN" \
      -d '{
            "cluster_id": "<идентификатор_кластера>",
            "connector_spec": {
              "name": "<имя_коннектора>",
              "tasks_max": {
                "value": "<лимит_задач>"
              },
              "properties": "<дополнительные_свойства_коннектора>"
              "connector_config_mirrormaker": {
                <настройки_коннектора_Mirrormaker>
              }
            }
          }' \
      mdb.api.cloud.yandex.net:443 \
      yandex.cloud.mdb.kafka.v1.ConnectorService.Create
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге.

  4. Чтобы создать коннектор S3 Sink, воспользуйтесь вызовом ConnectorService/Create и выполните запрос, например, с помощью gRPCurl:

    grpcurl \
      -format json \
      -import-path ~/cloudapi/ \
      -import-path ~/cloudapi/third_party/googleapis/ \
      -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
      -rpc-header "Authorization: Bearer $IAM_TOKEN" \
      -d '{
            "cluster_id": "<идентификатор_кластера>",
            "connector_spec": {
              "name": "<имя_коннектора>",
              "tasks_max": {
                "value": "<лимит_задач>"
              },
              "properties": "<дополнительные_свойства_коннектора>"
              "connector_config_s3_sink": {
                <настройки_коннектора_S3-Sink>
              }
            }
          }' \
      mdb.api.cloud.yandex.net:443 \
      yandex.cloud.mdb.kafka.v1.ConnectorService.Create
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге.

  5. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

MirrorMakerMirrorMaker

Укажите параметры коннектора MirrorMaker:

Консоль управления
CLI
Terraform
REST API
gRPC API
  • Топики — шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ |. Можно использовать выражение .*, например analysis.*. Для переноса всех топиков укажите .*.

  • Фактор репликации — количество копий топика, хранящихся в кластере.

  • В блоке Кластер-источник укажите параметры для подключения к кластеру-источнику:

    • Псевдоним — префикс для обозначения кластера-источника в настройках коннектора.

      Примечание

      Топики в кластере-приемнике будут созданы с указанным префиксом.

    • Использовать этот кластер — выберите опцию для использования текущего кластера в качестве источника.

    • Бутстрап-серверы — список FQDN хостов-брокеров кластера-источника с номерами портов для подключения, разделенный запятыми. Например: broker1.example.com:9091,broker2.example.com.

      Как получить FQDN хоста-брокера, см. в инструкции.

    • SASL имя пользователя — имя пользователя для подключения коннектора к кластеру-источнику.

    • SASL пароль — пароль пользователя для подключения коннектора к кластеру-источнику.

    • SASL механизм — выберите механизм шифрования имени и пароля.

    • Протокол безопасности — выберите протокол подключения коннектора:

      • PLAINTEXT, SASL_PLAINTEXT – для подключений без SSL;
      • SSL, SASL_SSL – для подключений с SSL.
    • Сертификат в формате PEM — загрузите PEM-сертификат для доступа к внешнему кластеру.

  • В блоке Кластер-приёмник укажите параметры для подключения к кластеру-приемнику:

    • Псевдоним — префикс для обозначения кластера-приемника в настройках коннектора.

    • Использовать этот кластер — выберите опцию для использования текущего кластера в качестве приемника.

    • Бутстрап-серверы — список FQDN хостов-брокеров кластера-приемника с номерами портов для подключения, разделенный запятыми.

      Как получить FQDN хоста-брокера, см. в инструкции.

    • SASL имя пользователя — имя пользователя для подключения коннектора к кластеру-приемнику.

    • SASL пароль — пароль пользователя для подключения коннектора к кластеру-приемнику.

    • SASL механизм — выберите механизм шифрования имени и пароля.

    • Протокол безопасности — выберите протокол подключения коннектора:

      • PLAINTEXT, SASL_PLAINTEXT – для подключений без SSL;
      • SSL, SASL_SSL – для подключений с SSL.
    • Сертификат в формате PEM — загрузите PEM-сертификат для доступа к внешнему кластеру.

  • Чтобы задать значения дополнительных настроек, не указанных в этом списке, создайте необходимые ключи и задайте их значения в блоке Дополнительные свойства при создании или изменении коннектора. Примеры ключей:

    • key.converter
    • value.converter

    Список общих настроек коннекторов в документации Apache Kafka®.

  • --cluster-name — имя кластера.

  • --direction — направление коннектора:

    • ingress — если кластер является приемником.
    • egress — если кластер является источником.
  • --tasks-max — максимальное количество одновременно запущенных задач коннектора.

  • --properties — список дополнительных настроек коннектора в формате <ключ>:<значение>, разделенный запятыми. Примеры ключей:

    • key.converter
    • value.converter

    Список общих настроек коннекторов в документации Apache Kafka®.

  • --replication-factor — количество копий топика, хранящихся в кластере.

  • --topics — шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ |. Можно использовать выражение .*, например analysis.*. Для переноса всех топиков укажите .*.

  • --this-cluster-alias — префикс для обозначения этого кластера в настройках коннектора.

  • --external-cluster — параметры внешнего кластера:

    • alias — префикс для обозначения внешнего кластера в настройках коннектора.

    • bootstrap-servers — список FQDN хостов-брокеров внешнего кластера с номерами портов для подключения, разделенный запятыми.

      Как получить FQDN хоста-брокера, см. в инструкции.

    • security-protocol — протокол подключения коннектора:

      • plaintext, sasl_plaintext – для подключений без SSL;
      • ssl, sasl_ssl – для подключений с SSL.
    • sasl-mechanism — механизм шифрования имени и пароля.

    • sasl-username — имя пользователя для подключения коннектора к внешнему кластеру.

    • sasl-password — пароль пользователя для подключения коннектора к внешнему кластеру.

    • ssl-truststore-certificates — список сертификатов в формате PEM.

  • properties — список дополнительных настроек коннектора в формате <ключ>:<значение>, разделенный запятыми. Примеры ключей:

    • key.converter
    • value.converter

    Список общих настроек коннекторов в документации Apache Kafka®.

  • connector_config_mirrormaker — настройки коннектора MirrorMaker:

    • replication_factor — количество копий топика, хранящихся в кластере.
    • topics — шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ |. Можно использовать выражение .*, например analysis.*. Для переноса всех топиков укажите .*.
    • source_cluster и target_cluster — параметры для подключения к кластеру-источнику и кластеру-приемнику:
      • alias — префикс для обозначения кластера в настройках коннектора.

        Примечание

        Топики в кластере-приемнике будут созданы с указанным префиксом.

      • external_cluster — параметры для подключения к внешнему кластеру:

        • bootstrap_servers — список FQDN хостов-брокеров кластера с номерами портов для подключения, разделенный запятыми.

          Как получить FQDN хоста-брокера, см. в инструкции.

        • sasl_username — имя пользователя для подключения коннектора к кластеру.

        • sasl_password — пароль пользователя для подключения коннектора к кластеру.

        • sasl_mechanism — механизм шифрования имени и пароля.

        • security_protocol — протокол подключения коннектора:

          • PLAINTEXT, SASL_PLAINTEXT — для подключений без SSL;
          • SSL, SASL_SSL — для подключений с SSL.
        • ssl_truststore_certificates — содержимое PEM-сертификата.

      • this_cluster — опция для использования текущего кластера в качестве источника или приемника.

Настройки коннектора MirrorMaker задаются в параметре connectorSpec.connectorConfigMirrormaker:

  • sourceCluster и targetCluster — параметры для подключения к кластеру-источнику и кластеру-приемнику:

    • alias — префикс для обозначения кластера в настройках коннектора.

      Примечание

      Топики в кластере-приемнике будут созданы с указанным префиксом.

    • thisCluster — опция для использования текущего кластера в качестве источника или приемника.

    • externalCluster — параметры для подключения к внешнему кластеру:

      • bootstrapServers — список FQDN хостов-брокеров кластера с номерами портов для подключения, разделенный запятыми.

        Как получить FQDN хоста-брокера, см. в инструкции.

      • saslUsername — имя пользователя для подключения коннектора к кластеру.

      • saslPassword — пароль пользователя для подключения коннектора к кластеру.

      • saslMechanism — механизм шифрования имени и пароля.

      • securityProtocol — протокол подключения коннектора:

        • PLAINTEXT, SASL_PLAINTEXT – для подключений без SSL;
        • SSL, SASL_SSL – для подключений с SSL.
      • sslTruststoreCertificates — содержимое PEM-сертификата.

  • topics — шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ |. Можно использовать выражение .*, например analysis.*. Для переноса всех топиков укажите .*.

  • replicationFactor — количество копий топика, хранящихся в кластере.

Настройки коннектора MirrorMaker задаются в параметре connector_spec.connector_config_mirrormaker:

  • source_cluster и target_cluster — параметры для подключения к кластеру-источнику и кластеру-приемнику:

    • alias — префикс для обозначения кластера в настройках коннектора.

      Примечание

      Топики в кластере-приемнике будут созданы с указанным префиксом.

    • this_cluster — опция для использования текущего кластера в качестве источника или приемника.

    • external_cluster — параметры для подключения к внешнему кластеру:

      • bootstrap_servers — список FQDN хостов-брокеров кластера с номерами портов для подключения, разделенный запятыми.

        Как получить FQDN хоста-брокера, см. в инструкции.

      • sasl_username — имя пользователя для подключения коннектора к кластеру.

      • sasl_password — пароль пользователя для подключения коннектора к кластеру.

      • sasl_mechanism — механизм шифрования имени и пароля.

      • security_protocol — протокол подключения коннектора:

        • PLAINTEXT, SASL_PLAINTEXT – для подключений без SSL;
        • SSL, SASL_SSL – для подключений с SSL.
      • ssl_truststore_certificates — содержимое PEM-сертификата.

  • topics — шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ |. Можно использовать выражение .*, например analysis.*. Для переноса всех топиков укажите .*.

  • replication_factor — количество копий топика, хранящихся в кластере. Передается в виде объекта с полем value.

S3 SinkS3 Sink

Укажите параметры коннектора S3 Sink:

Консоль управления
CLI
Terraform
REST API
gRPC API
  • Топики — шаблон для отбора экспортируемых топиков, имена топиков перечисляются через запятую или символ |. Можно использовать выражение .*, например analysis.*. Для переноса всех топиков укажите .*.

  • Механизм сжатия — выберите кодек для сжатия сообщений:

    • none (по умолчанию) — сжатие отсутствует;
    • gzip — кодек gzip;
    • snappy — кодек snappy;
    • zstd — кодек zstd.

    После создания кластера данный параметр нельзя изменить.

  • (Опционально) Максимальное количество записей на файл — максимальное количество записей, которое может быть записано в один файл, размещенный в S3-совместимом хранилище.

  • В блоке Подключение к S3 укажите параметры подключения к хранилищу:

    • Имя бакета — имя бакета хранилища.

    • Эндпоинт — эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища).

    • (Опционально) Регион — название региона. Значение по умолчанию — ru-central1. Список доступных регионов.

      Примечание

      Некоторые приложения, предназначенные для работы с Amazon S3, не позволяют указывать регион, поэтому Yandex Object Storage принимает также значение основного региона AWS — первая строка в таблице регионов.

    • (Опционально) Идентификатор ключа доступа, Секретный ключ — идентификатор и содержимое AWS-совместимого ключа.

  • Чтобы задать значения дополнительных настроек, не указанных в этом списке, создайте необходимые ключи и задайте их значения в блоке Дополнительные свойства при создании или изменении коннектора. Примеры ключей:

    • key.converter
    • value.converter
    • value.converter.schemas.enable
    • format.output.type

    Список всех настроек коннектора в документации коннектора. Список общих настроек коннекторов в документации Apache Kafka®.

  • --cluster-name — имя кластера.

  • --tasks-max — максимальное количество одновременно запущенных задач коннектора.

  • --properties — список дополнительных настроек коннектора в формате <ключ>:<значение>, разделенный запятыми. Примеры ключей:

    • key.converter
    • value.converter
    • value.converter.schemas.enable
    • format.output.type

    Список всех настроек коннектора в документации коннектора. Список общих настроек коннекторов в документации Apache Kafka®.

  • --topics — шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ |. Можно использовать выражение .*, например analysis.*. Для переноса всех топиков укажите .*.

  • --file-compression-type — кодек для сжатия сообщений. После создания кластера данный параметр нельзя изменить. Допустимые значения:

    • none (по умолчанию) — сжатие отсутствует;
    • gzip — кодек gzip;
    • snappy — кодек snappy;
    • zstd — кодек zstd.
  • --file-max-records — максимальное количество записей, которое может быть записано в один файл, размещенный в S3-совместимом хранилище.

  • --bucket-name — имя бакета в S3-совместимом хранилище, в который будет производиться запись.

  • --storage-endpoint — эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища). Пример: storage.yandexcloud.net.

  • --region — регион, в котором находится бакет S3-совместимого хранилища. Значение по умолчанию — ru-central1. Список доступных регионов.

    Примечание

    Некоторые приложения, предназначенные для работы с Amazon S3, не позволяют указывать регион, поэтому Yandex Object Storage принимает также значение основного региона AWS — первая строка в таблице регионов.

  • --access-key-id, --secret-access-key — идентификатор и содержимое AWS-совместимого ключа.

  • properties — список дополнительных настроек коннектора в формате <ключ>:<значение>, разделенный запятыми. Примеры ключей:

    • key.converter
    • value.converter
    • value.converter.schemas.enable
    • format.output.type

Список всех настроек коннектора в документации коннектора. Список общих настроек коннекторов в документации Apache Kafka®.

  • connector_config_s3_sink — настройки коннектора S3 Sink:
    • file_compression_type — кодек для сжатия сообщений. После создания кластера данный параметр нельзя изменить. Допустимые значения:

      • none (по умолчанию) — сжатие отсутствует;
      • gzip — кодек gzip;
      • snappy — кодек snappy;
      • zstd — кодек zstd.
    • topics — шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ |. Можно использовать выражение .*, например analysis.*. Для переноса всех топиков укажите .*.

    • file_max_records — максимальное количество записей, которое может быть записано в один файл, размещенный в S3-совместимом хранилище.

    • s3_connection — параметры для подключения к S3-совместимому хранилищу:

      • bucket_name — имя бакета, в который будет производиться запись.

      • external_s3 — параметры для подключения к внешнему S3-совместимому хранилищу:

        • endpoint — эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища). Пример: storage.yandexcloud.net.

        • region — регион, в котором находится бакет S3-совместимого хранилища. Значение по умолчанию — ru-central1. Список доступных регионов.

          Примечание

          Некоторые приложения, предназначенные для работы с Amazon S3, не позволяют указывать регион, поэтому Yandex Object Storage принимает также значение основного региона AWS — первая строка в таблице регионов.

        • access_key_id, secret_access_key — идентификатор и содержимое AWS-совместимого ключа.

Настройки коннектора S3 Sink задаются в параметре connectorSpec.connectorConfigS3Sink:

  • topics — шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ |. Можно использовать выражение .*, например analysis.*. Для переноса всех топиков укажите .*.

  • fileCompressionType — кодек для сжатия сообщений. После создания кластера данный параметр нельзя изменить. Допустимые значения:

    • none (по умолчанию) — сжатие отсутствует;
    • gzip — кодек gzip;
    • snappy — кодек snappy;
    • zstd — кодек zstd.
  • fileMaxRecords — максимальное количество записей, которое может быть записано в один файл, размещенный в S3-совместимом хранилище.

  • s3Connection — параметры для подключения к S3-совместимому хранилищу:

    • bucketName — имя бакета, в который будет производиться запись.
    • externalS3 — параметры внешнего хранилища:
      • endpoint — эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища). Пример: storage.yandexcloud.net.

      • region — регион, в котором находится бакет S3-совместимого хранилища. Значение по умолчанию — ru-central1. Список доступных регионов.

        Примечание

        Некоторые приложения, предназначенные для работы с Amazon S3, не позволяют указывать регион, поэтому Yandex Object Storage принимает также значение основного региона AWS — первая строка в таблице регионов.

      • accessKeyId, secretAccessKey — идентификатор и содержимое AWS-совместимого ключа.

Настройки коннектора S3 Sink задаются в параметре connector_spec.connector_config_s3_sink:

  • topics — шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ |. Можно использовать выражение .*, например analysis.*. Для переноса всех топиков укажите .*.

  • file_compression_type — кодек для сжатия сообщений. После создания кластера данный параметр нельзя изменить. Допустимые значения:

    • none (по умолчанию) — сжатие отсутствует;
    • gzip — кодек gzip;
    • snappy — кодек snappy;
    • zstd — кодек zstd.
  • file_max_records — максимальное количество записей, которое может быть записано в один файл, размещенный в S3-совместимом хранилище. Передается в виде объекта с полем value.

  • s3_connection — параметры для подключения к S3-совместимому хранилищу:

    • bucket_name — имя бакета, в который будет производиться запись.
    • external_s3 — параметры внешнего хранилища:
      • endpoint — эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища). Пример: storage.yandexcloud.net.

      • region — регион, в котором находится бакет S3-совместимого хранилища. Значение по умолчанию — ru-central1. Список доступных регионов.

        Примечание

        Некоторые приложения, предназначенные для работы с Amazon S3, не позволяют указывать регион, поэтому Yandex Object Storage принимает также значение основного региона AWS — первая строка в таблице регионов.

      • access_key_id, secret_access_key — идентификатор и содержимое AWS-совместимого ключа.

Iceberg SinkIceberg Sink

Укажите параметры коннектора Iceberg Sink:

Консоль управления
CLI
Terraform
REST API
gRPC API
  • Топик управления — выберите или создайте топик управления. Топик будет использоваться для координации и управления процессом записи данных в Iceberg-таблицы.

  • Источник топиков — выберите источник топиков, данные из которого будут перенесены в Iceberg-таблицы:

    • Список топиков — имена топиков через запятую.
    • Regex топиков — регулярное выражение для выбора топиков. Можно использовать выражение .*, например analysis.*. Для переноса всех топиков укажите .*.
  • Маршрутизация таблиц — выберите правило, по которому каждое сообщение из топика Apache Kafka® будет попадать в Iceberg-таблицы:

    • Статическая — таблицы назначения определяются заранее. Каждый топик со всеми сообщениями будет попадать в отдельную Iceberg-таблицу.

      В поле Таблицы перечислите имена Iceberg-таблиц через запятую.

    • Динамическая — таблица назначения определяется по содержимому самого сообщения.

      В поле Поле маршрутизации укажите поле в сообщении, по значению которого определяется целевая таблица.

  • В блоке Подключение к Metastore укажите параметры подключения к Apache Hive™ Metastore:

    • URI каталога — URI для подключения к кластеру Apache Hive™ Metastore в формате thrift://<хост>:<порт>.
    • Warehouse — корневая директория для хранения данных управляемых таблиц в S3 в формате s3a://bucket-name/path/to/warehouse.
  • В блоке Подключение к S3 укажите параметры подключения к хранилищу:

    • Эндпоинт — эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища).

    • (Опционально) Регион — название региона. Значение по умолчанию — ru-central1. Список доступных регионов.

      Примечание

      Некоторые приложения, предназначенные для работы с Amazon S3, не позволяют указывать регион, поэтому Yandex Object Storage принимает также значение основного региона AWS — первая строка в таблице регионов.

    • Идентификатор ключа доступа, Секретный ключ — идентификатор и содержимое AWS-совместимого ключа.

  • (Опционально) В блоке Дополнительные настройки:

    • Секция Настройки таблиц:

      • Ветка коммитов по умолчанию — имя ветки по умолчанию. В эту ветку Iceberg-таблицы коннектор будет коммитить данные. Значение по умолчанию — main.
      • Список столбцов по умолчанию — список столбцов по умолчанию, разделенных запятыми, которые определяют строку идентификатора в Iceberg-таблицах (primary key). Является обязательным параметром при включенном UPSERT-режиме.
      • Партиционирование по умолчанию — список столбцов или выражений трансформации для партиционирования данных Iceberg-таблицы через запятую. Определяет физическое размещение данных для оптимизации запросов. Примеры: date, year, month, year (timestamp), month (timestamp), days (timestamp), bucket (16, user_id).
      • Включить автоматическую эволюцию схемы — настройка, которая указывает, должен ли коннектор автоматически изменять схему Iceberg-таблицы, если схема входящих сообщений из Apache Kafka® изменилась.
      • Сделать все колонки nullable — настройка, указывающая, делать ли все поля схемы Iceberg-таблицы необязательными (nullable), независимо от того, как они определены в схеме входящего сообщения.
      • Регистронезависимое сопоставление имён полей — настройка, которая указывает, должен ли коннектор игнорировать регистр при сопоставлении полей входящего сообщения с колонками Iceberg-таблицы.
    • Секция Настройки управления:

      • Префикс consumer group — префикс для Consumer Group ID, который коннектор использует при чтении из топиков Apache Kafka®. Значение по умолчанию — cg-control.
      • Интервал коммита, мс — указывает, как часто коннектор делает коммит данных в Iceberg-таблицу. Задается в миллисекундах. Значение по умолчанию — 300000.
      • Таймаут коммита, мс — указывает, сколько времени координатор ждет подтверждения от всех воркеров перед тем, как признать коммит неудачным. Задается в миллисекундах. Значение по умолчанию — 30000.
      • Потоки коммита — количество потоков, которые используются для коммита данных в Iceberg-таблицу.
      • Транзакционный префикс — префикс для Transactional ID, который коннектор использует при записи в Apache Kafka® в рамках транзакций.
  • --cluster-id — идентификатор кластера.

  • --cluster-name — имя кластера.

  • --tasks-max — максимальное количество одновременно запущенных задач коннектора.

  • --properties — список дополнительных настроек коннектора в формате <ключ>:<значение>, разделенный запятыми. Примеры ключей:

    • key.converter
    • value.converter
    • value.converter.schemas.enable

    Список общих настроек коннекторов в документации Apache Kafka®.

  • --topics — список топиков, разделенных запятыми, данные из которых будут перенесены в Iceberg-таблицы.

  • --topics-regex — регулярное выражение для выбора топиков, данные из которых будут перенесены в Iceberg-таблицы. Можно использовать выражение .*, например, analysis.*. Для переноса всех топиков укажите .*.

  • --control-topic — имя топика управления, используется для координации и управления процессом записи данных в Iceberg-таблицы.

  • --catalog-uri — URI для подключения к кластеру Apache Hive™ Metastore в формате thrift://<хост>:<порт>.

  • --warehouse — корневая директория для хранения данных управляемых таблиц в S3 в формате s3a://bucket-name/path/to/warehouse.

  • --access-key-id, --secret-access-key — идентификатор и содержимое AWS-совместимого ключа.

  • --storage-endpoint — эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища). Пример: storage.yandexcloud.net.

  • --region — регион, в котором находится бакет S3-совместимого хранилища. Значение по умолчанию — ru-central1. Список доступных регионов.

    Примечание

    Некоторые приложения, предназначенные для работы с Amazon S3, не позволяют указывать регион, поэтому Yandex Object Storage принимает также значение основного региона AWS — первая строка в таблице регионов.

  • --tables — имена Iceberg-таблиц, разделенные запятыми, для статической маршрутизации таблиц.

  • --route-field — поле в сообщении для определения целевой таблицы при динамической маршрутизации.

  • --default-commit-branch — имя ветки по умолчанию. В эту ветку Iceberg-таблицы коннектор будет коммитить данные. Значение по умолчанию — main.

  • --default-id-columns — список столбцов по умолчанию, разделенных запятыми, которые определяют строку идентификатора в Iceberg-таблицах (primary key). Является обязательным параметром при включенном UPSERT-режиме.

  • --default-partition-by — список столбцов или выражений трансформации для партиционирования данных Iceberg-таблицы через запятую. Определяет физическое размещение данных для оптимизации запросов. Примеры: date, year, month, year (timestamp), month (timestamp), days (timestamp), bucket (16, user_id).

  • --evolve-schema-enabled — настройка, которая указывает, должен ли коннектор автоматически изменять схему Iceberg-таблицы, если схема входящих сообщений из Apache Kafka® изменилась. Значение по умолчанию — false.

  • --schema-force-optional — настройка, которая указывает, делать ли все поля схемы Iceberg-таблицы необязательными (nullable), независимо от того, как они определены в схеме входящего сообщения. Значение по умолчанию — false.

  • --schema-case-insensitive — настройка, которая указывает, должен ли коннектор игнорировать регистр при сопоставлении полей входящего сообщения с колонками Iceberg-таблицы. Значение по умолчанию — false.

  • --group-id-prefix — префикс для Consumer Group ID, который коннектор использует при чтении из топиков Apache Kafka®. Значение по умолчанию — cg-control.

  • --commit-interval-ms — указывает, как часто коннектор делает коммит данных в Iceberg-таблицу. Задается в миллисекундах. Значение по умолчанию — 300000.

  • --commit-timeout-ms — указывает, сколько времени координатор ждет подтверждения от всех воркеров перед тем, как признать коммит неудачным. Задается в миллисекундах. Значение по умолчанию — 30000.

  • --commit-threads — количество потоков, которые используются для коммита данных в Iceberg-таблицу. Значение по умолчанию — vCPU × 2.

  • --transactional-prefix — префикс для Transactional ID, который коннектор использует при записи в Apache Kafka® в рамках транзакций.

  • properties — список дополнительных настроек коннектора в формате <ключ>:<значение>, разделенный запятыми. Примеры ключей:

    • key.converter
    • value.converter
    • value.converter.schemas.enable

    Список общих настроек коннекторов в документации Apache Kafka®.

  • tasks_max — максимальное количество одновременно запущенных задач коннектора.

  • connector_config_iceberg_sink — блок с настройками коннектора Iceberg Sink:

    • control_topic — имя топика управления, используется для координации и управления процессом записи данных в Iceberg-таблицы.

    • topics — список топиков, разделенных запятыми, данные из которых будут перенесены в Iceberg-таблицы.

    • topics_regex — регулярное выражение для выбора топиков, данные из которых будут перенесены в Iceberg-таблицы. Можно использовать выражение .*, например, analysis.*. Для переноса всех топиков укажите .*.

    • control_config — блок с дополнительными настройками:

      • commit_interval_ms — указывает, как часто коннектор делает коммит данных в Iceberg-таблицу. Задается в миллисекундах. Значение по умолчанию — 300000.
      • commit_threads — количество потоков, которые используются для коммита данных в Iceberg-таблицу. Значение по умолчанию — vCPU × 2.
      • commit_timeout_ms — указывает, сколько времени координатор ждет подтверждения от всех воркеров перед тем, как признать коммит неудачным. Задается в миллисекундах. Значение по умолчанию — 30000.
      • group_id_prefix — префикс для Consumer Group ID, который коннектор использует при чтении из топиков Apache Kafka®. Значение по умолчанию — cg-control.
      • transactional_prefix — префикс для Transactional ID, который коннектор использует при записи в Apache Kafka® в рамках транзакций.
    • dynamic_tables — блок с настройками динамической маршрутизации таблиц:

      • route_field — поле в сообщении для определения целевой таблицы при динамической маршрутизации.
    • metastore_connection — блок с настройками подключения к Apache Hive™ Metastore:

      • catalog_uri — URI для подключения к кластеру Apache Hive™ Metastore в формате thrift://<хост>:<порт>.
      • warehouse — корневая директория для хранения данных управляемых таблиц в S3 в формате s3a://bucket-name/path/to/warehouse.
    • s3_connection — блок с настройками для подключения к S3-совместимому хранилищу:

      • external_s3 — блок с параметрами для подключения к S3-совместимому хранилищу:
        • endpoint — эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища). Пример: storage.yandexcloud.net.

        • region — регион, в котором находится бакет S3-совместимого хранилища. Значение по умолчанию — ru-central1. Список доступных регионов.

          Примечание

          Некоторые приложения, предназначенные для работы с Amazon S3, не позволяют указывать регион, поэтому Yandex Object Storage принимает также значение основного региона AWS — первая строка в таблице регионов.

        • access_key_id, secret_access_key — идентификатор и содержимое AWS-совместимого ключа.

    • static_tables — блок с настройками статической маршрутизации таблиц:

      • tables — имена Iceberg-таблиц, разделенные запятыми, для статической маршрутизации таблиц.
    • tables_config — блок с настройками таблиц:

      • default_commit_branch — имя ветки по умолчанию. В эту ветку Iceberg-таблиц коннектор будет коммитить данные. Значение по умолчанию — main.
      • default_id_columns — список столбцов по умолчанию, разделенных запятыми, которые определяют строку идентификатора в Iceberg-таблицах (primary key). Является обязательным параметром при включенном UPSERT-режиме.
      • default_partition_by — список столбцов или выражений трансформации для партиционирования данных Iceberg-таблицы через запятую. Определяет физическое размещение данных для оптимизации запросов. Примеры: date, year, month, year (timestamp), month (timestamp), days (timestamp), bucket (16, user_id).
      • evolve_schema_enabled — настройка указывает, должен ли коннектор автоматически изменять схему Iceberg-таблицы, если схема входящих сообщений из Apache Kafka® изменилась. Значение по умолчанию — false.
      • schema_case_insensitive — настройка, которая указывает, должен ли коннектор игнорировать регистр при сопоставлении полей входящего сообщения с колонками Iceberg-таблицы. Значение по умолчанию — false.
      • schema_force_optional — настройка, указывающая, делать ли все поля схемы Iceberg-таблицы необязательными (nullable), независимо от того, как они определены в схеме входящего сообщения. Значение по умолчанию — false.

Настройки коннектора Iceberg Sink задаются в параметре connectorSpec.connectorConfigIcebergSink:

  • topics — список топиков, разделенных запятыми, данные из которых будут перенесены в Iceberg-таблицы.
  • topicsRegex — регулярное выражение для выбора топиков, данные из которых будут перенесены в Iceberg-таблицы. Можно использовать выражение .*, например, analysis.*. Для переноса всех топиков укажите .*.

Для отбора топиков используйте либо параметр topics, либо параметр topicsRegex.

  • controlTopic — имя топика управления, используется для координации и управления процессом записи данных в Iceberg-таблицы.

  • metastoreConnection — параметры для подключения к Apache Hive™ Metastore:

    • catalogUri — URI для подключения к кластеру Apache Hive™ Metastore в формате thrift://<хост>:<порт>.
    • warehouse — корневая директория для хранения данных управляемых таблиц в S3 в формате s3a://bucket-name/path/to/warehouse.
  • s3Connection — параметры для подключения к S3-совместимому хранилищу:

    • externalS3 — параметры внешнего хранилища:
      • endpoint — эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища). Пример: storage.yandexcloud.net.

      • region — регион, в котором находится бакет S3-совместимого хранилища. Значение по умолчанию — ru-central1. Список доступных регионов.

        Примечание

        Некоторые приложения, предназначенные для работы с Amazon S3, не позволяют указывать регион, поэтому Yandex Object Storage принимает также значение основного региона AWS — первая строка в таблице регионов.

      • accessKeyId, secretAccessKey — идентификатор и содержимое AWS-совместимого ключа.

  • staticTables — блок с настройками статической маршрутизации таблиц:

    • tables — имена Iceberg-таблиц через запятую для статической маршрутизации таблиц.
  • dynamicTables — блок с настройками динамической маршрутизации таблиц:

    • routeField — поле в сообщении для определения целевой таблицы при динамической маршрутизации.

Для настройки маршрутизации таблиц используйте либо параметр staticTables, либо параметр dynamicTables.

  • tablesConfig — блок с настройками таблиц:
    • defaultCommitBranch — имя ветки по умолчанию. В эту ветку Iceberg-таблицы коннектор будет коммитить данные. Значение по умолчанию — main.
    • defaultIdColumns — список столбцов по умолчанию, разделенных запятыми, которые определяют строку идентификатора в Iceberg-таблицах (primary key). Является обязательным параметром при включенном UPSERT-режиме.
    • defaultPartitionBy — список столбцов или выражений трансформации для партиционирования данных Iceberg-таблицы через запятую. Определяет физическое размещение данных для оптимизации запросов. Примеры: date, year, month, year (timestamp), month (timestamp), days (timestamp), bucket (16, user_id).
    • evolveSchemaEnabled — настройка, указывающая, должен ли коннектор автоматически изменять схему Iceberg-таблицы, если схема входящих сообщений из Apache Kafka® изменилась. Значение по умолчанию — false.
    • schemaForceOptional — настройка, указывающая, делать ли все поля схемы Iceberg-таблицы необязательными (nullable), независимо от того, как они определены в схеме входящего сообщения. Значение по умолчанию — false.
    • schemaCaseInsensitive — настройка, указывающая, должен ли коннектор игнорировать регистр при сопоставлении полей входящего сообщения с колонками Iceberg-таблицы. Значение по умолчанию — false.
  • controlConfig — блок с дополнительными настройками:
    • groupIdPrefix — префикс для Consumer Group ID, который коннектор использует при чтении из топиков Apache Kafka®. Значение по умолчанию — cg-control.
    • commitIntervalMs — указывает, как часто коннектор делает коммит данных в Iceberg-таблицу. Задается в миллисекундах. Значение по умолчанию — 300000.
    • commitTimeoutMs — указывает, сколько времени координатор ждет подтверждения от всех воркеров перед тем, как признать коммит неудачным. Задается в миллисекундах. Значение по умолчанию — 30000.
    • commitThreads — количество потоков, которые используются для коммита данных в Iceberg-таблицу. Значение по умолчанию — vCPU × 2.
    • transactionalPrefix — префикс для Transactional ID, который коннектор использует при записи в Apache Kafka® в рамках транзакций.

Настройки коннектора Iceberg Sink задаются в параметре connector_spec.connector_config_iceberg_sink:

  • topics — список топиков, разделенных запятыми, данные из которых будут перенесены в Iceberg-таблицы.
  • topics_regex — регулярное выражение для выбора топиков, данные из которых будут перенесены в Iceberg-таблицы. Можно использовать выражение .*, например, analysis.*. Для переноса всех топиков укажите .*.

Для отбора топиков используйте либо параметр topics, либо параметр topics_regex.

  • control_topic — имя топика управления, используется для координации и управления процессом записи данных в Iceberg-таблицы.

  • metastore_connection — параметры для подключения к Apache Hive™ Metastore:

    • catalog_uri — URI для подключения к кластеру Apache Hive™ Metastore в формате thrift://<хост>:<порт>.

    • warehouse — корневая директория для хранения данных управляемых таблиц в S3 в формате s3a://bucket-name/path/to/warehouse.

    • s3_connection — параметры для подключения к S3-совместимому хранилищу:

      • externalS3 — параметры внешнего хранилища:
        • endpoint — эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища). Пример: storage.yandexcloud.net.

        • region — регион, в котором находится бакет S3-совместимого хранилища. Значение по умолчанию — ru-central1. Список доступных регионов.

          Примечание

          Некоторые приложения, предназначенные для работы с Amazon S3, не позволяют указывать регион, поэтому Yandex Object Storage принимает также значение основного региона AWS — первая строка в таблице регионов.

        • access_key_id, secret_access_key — идентификатор и содержимое AWS-совместимого ключа.

    • static_tables — блок с настройками статической маршрутизации таблиц:

      • tables — имена Iceberg-таблиц через запятую для статической маршрутизации таблиц.
    • dynamic_tables — блок с настройками динамической маршрутизации таблиц:

      • route_field — поле в сообщении для определения целевой таблицы при динамической маршрутизации.

    Для настройки маршрутизации таблиц используйте либо параметр static_tables, либо параметр dynamic_tables.

    • tables_config — блок с настройками таблиц:
      • default_commit_branch — имя ветки по умолчанию. В эту ветку Iceberg-таблиц коннектор будет коммитить данные. Значение по умолчанию — main.
      • default_id_columns — список столбцов по умолчанию, разделенных запятыми, которые определяют строку идентификатора в Iceberg-таблицах (primary key). Является обязательным параметром при включенном UPSERT-режиме.
      • default_partition_by — список столбцов или выражений трансформации для партиционирования данных Iceberg-таблицы через запятую. Определяет физическое размещение данных для оптимизации запросов. Примеры: date, year, month, year (timestamp), month (timestamp), days (timestamp), bucket (16, user_id).
      • evolve_schema_enabled — настройка, которая указывает, должен ли коннектор автоматически изменять схему Iceberg-таблицы, если схема входящих сообщений из Apache Kafka® изменилась. Значение по умолчанию — false.
      • schema_force_optional — настройка, указывающая, делать ли все поля схемы Iceberg-таблицы необязательными (nullable), независимо от того, как они определены в схеме входящего сообщения. Значение по умолчанию — false.
      • schema_case_insensitive — настройка, которая указывает, должен ли коннектор игнорировать регистр при сопоставлении полей входящего сообщения с колонками Iceberg-таблицы. Значение по умолчанию — false.
    • control_config — блок с дополнительными настройками:
      • group_id_prefix — префикс для Consumer Group ID, который коннектор использует при чтении из топиков Apache Kafka®. Значение по умолчанию — cg-control.
      • commit_interval_ms — указывает, как часто коннектор делает коммит данных в Iceberg-таблицу. Задается в миллисекундах. Значение по умолчанию — 300000.
      • commit_timeout_ms — указывает, сколько времени координатор ждет подтверждения от всех воркеров перед тем, как признать коммит неудачным. Задается в миллисекундах. Значение по умолчанию — 30000.
      • commit_threads — количество потоков, которые используются для коммита данных в Iceberg-таблицу. Значение по умолчанию — vCPU × 2.
      • transactional_prefix — префикс для Transactional ID, который коннектор использует при записи в Apache Kafka® в рамках транзакций.

Изменить коннекторИзменить коннектор

Консоль управления
CLI
Terraform
REST API
gRPC API
  1. В консоли управления перейдите в нужный каталог.
  2. Перейдите в сервис Managed Service for Kafka.
  3. Выберите нужный кластер и перейдите на вкладку Коннекторы.
  4. В строке с нужным коннектором нажмите на значок и выберите пункт Изменить коннектор.
  5. Внесите необходимые изменения в свойства коннектора.
  6. Нажмите кнопку Сохранить.

Если у вас еще нет интерфейса командной строки Yandex Cloud (CLI), установите и инициализируйте его.

По умолчанию используется каталог, указанный при создании профиля CLI. Чтобы изменить каталог по умолчанию, используйте команду yc config set folder-id <идентификатор_каталога>. Также для любой команды вы можете указать другой каталог с помощью параметров --folder-name или --folder-id. Если вы обращаетесь к ресурсу по имени, поиск будет выполнен в каталоге по умолчанию. Если вы обращаетесь к ресурсу по идентификатору, поиск будет выполнен глобально — во всех каталогах с учетом прав доступа.

Чтобы изменить коннектор MirrorMaker:

  1. Посмотрите описание команды CLI для изменения коннектора:

    yc managed-kafka connector-mirrormaker update --help
    
  2. Запустите операцию, например, изменения лимита задач:

    yc managed-kafka connector-mirrormaker update <имя_коннектора> \
       --cluster-name=<имя_кластера> \
       --direction=<направление_коннектора> \
       --tasks-max=<новый_лимит_задач>
    

    Где --direction — направление коннектора: ingress или egres.

    Имя коннектора можно запросить со списком коннекторов в кластере, имя кластера — со списком кластеров в каталоге.

Чтобы изменить коннектор S3 Sink:

  1. Посмотрите описание команды CLI для изменения коннектора:

    yc managed-kafka connector-s3-sink update --help
    
  2. Запустите операцию, например, изменения лимита задач:

    yc managed-kafka connector-s3-sink update <имя_коннектора> \
       --cluster-name=<имя_кластера> \
       --tasks-max=<новый_лимит_задач>
    

    Имя коннектора можно запросить со списком коннекторов в кластере, имя кластера — со списком кластеров в каталоге.

Чтобы изменить коннектор Iceberg Sink:

  1. Посмотрите описание команды CLI для изменения коннектора:

    yc managed-kafka connector-iceberg-sink update --help
    
  2. Запустите операцию, например, изменение лимита задач:

    yc managed-kafka connector-iceberg-sink update <имя_коннектора> \
       --cluster-name=<имя_кластера> \
       --tasks-max=<новый_лимит_задач>
    

    Имя коннектора можно запросить со списком коннекторов в кластере, имя кластера — со списком кластеров в каталоге.

  1. Ознакомьтесь со списком настроек коннекторов MirrorMaker, S3 Sink и Iceberg Sink.

  2. Откройте актуальный конфигурационный файл Terraform с планом инфраструктуры.

    Как создать такой файл, описано в разделе Создание кластера Apache Kafka®.

  3. Измените значение параметров в описании ресурса yandex_mdb_kafka_connector:

    • Для коннектора MirrorMaker:

      resource "yandex_mdb_kafka_connector" "<имя_коннектора>" {
        cluster_id = "<идентификатор_кластера>"
        name       = "<имя_коннектора>"
        tasks_max  = <лимит_задач>
        properties = {
          <дополнительные_свойства>
        }
        connector_config_mirrormaker {
          topics             = "<шаблон_для_топиков>"
          replication_factor = <фактор_репликации>
          source_cluster {
            alias = "<префикс_для_обозначения_кластера>"
            external_cluster {
              bootstrap_servers           = "<список_FQDN_хостов-брокеров>"
              sasl_username               = "<имя_пользователя>"
              sasl_password               = "<пароль_пользователя>"
              sasl_mechanism              = "<механизм_шифрования>"
              security_protocol           = "<протокол_безопасности>"
              ssl-truststore-certificates = "<содержимое_PEM-сертификата>"
            }
          }
          target_cluster {
            alias = "<префикс_для_обозначения_кластера>"
            this_cluster {}
          }
        }
      }
      
    • Для коннектора S3 Sink:

      resource "yandex_mdb_kafka_connector" "<имя_S3_Sink_коннектора>" {
        cluster_id = "<идентификатор_кластера>"
        name       = "<имя_S3_Sink_коннектора>"
        tasks_max  = <лимит_задач>
        properties = {
          <дополнительные_свойства>
       }
        connector_config_s3_sink {
          topics                = "<шаблон_для_топиков>"
          file_max_records      = <максимальное_количество_сообщений_в_файле>
          s3_connection {
            bucket_name = "<имя_бакета>"
            external_s3 {
              endpoint          = "<эндпоинт_S3-совместимого_хранилища>"
              access_key_id     = "<идентификатор_AWS-совместимого_статического_ключа>"
              secret_access_key = "<содержимое_AWS-совместимого_статического_ключа>"
            }
          }
        }
      }
      
    • Для коннектора Iceberg Sink:

      resource "yandex_mdb_kafka_connector" "<имя_коннектора>" {
        cluster_id = "<идентификатор_кластера>"
        name       = "<имя_коннектора>"
        tasks_max  = <лимит_задач>
        properties = {
          <дополнительные_свойства>
        }
        connector_config_iceberg_sink {
          topics             = "<список_топиков>"
          control_topic = "<имя_топика_управления>"
      
          metastore_connection {
            catalog_uri = "<URI_для_подключения_к_кластеру_Metastore>"
            warehouse   = "<корневая_директория_для_хранения_данных_управляемых_таблиц_в_S3>"
          }
      
          s3_connection {
            external_s3 {
              endpoint          = "<эндпоинт_S3-совместимого_хранилища>"
              access_key_id     = "<идентификатор_AWS-совместимого_статического_ключа>"
              secret_access_key = "<содержимое_AWS-совместимого_статического_ключа>"
              region            = "<название_региона>"
            }
          }
      
          tables_config {
            default_commit_branch    = "<имя_ветки_по_умолчанию>"
            default_id_columns       = "<список_столбцов_по_умолчанию_через_запятую>"
            default_partition_by     = "<список_стобцов_или выражений_трансформации>"
            evolve_schema_enabled    = <автоматически_изменять_схему_Iceberg-таблицы>
            schema_force_optional    = <сделать_поля_схемы_Iceberg-таблицы_необязательными>
            schema_case_insensitive  = <игнорировать_регистр_при_сопоставлении_полей>
          }
      
          control_config {
            group_id_prefix      = "<префикс_для_Consumer_Group_ID>"
            commit_interval_ms   = <интервал_коммита_данных_в_Iceberg-таблицу>
            commit_timeout_ms    = <сколько_времени_координатор_ждет_подтверждения>
            commit_threads       = <количество_потоков_для_коммита_данных_в_Iceberg-таблицу>
            transactional_prefix = "<префикс_для_Transactional_ID>"
          }
        }
      }
      
  4. Проверьте корректность настроек.

    1. В командной строке перейдите в каталог, в котором расположены актуальные конфигурационные файлы Terraform с планом инфраструктуры.

    2. Выполните команду:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

  5. Подтвердите изменение ресурсов.

    1. Выполните команду для просмотра планируемых изменений:

      terraform plan
      

      Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

    2. Если вас устраивают планируемые изменения, внесите их:

      1. Выполните команду:

        terraform apply
        
      2. Подтвердите изменение ресурсов.

      3. Дождитесь завершения операции.

Подробнее в документации провайдера Terraform.

  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Воспользуйтесь методом Connector.update и выполните запрос, например, с помощью cURL:

    Важно

    Метод API переопределит все параметры изменяемого объекта, которые не были явно переданы в запросе, на значения по умолчанию. Чтобы избежать этого, перечислите настройки, которые вы хотите изменить, в параметре updateMask (одной строкой через запятую).

    curl \
      --request PATCH \
      --header "Authorization: Bearer $IAM_TOKEN" \
      --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/connectors/<имя_коннектора>' \
      --data '{
                "updateMask": "connectorSpec.tasksMax,connectorSpec.properties,connectorSpec.connectorConfigMirrormaker.<настройка_коннектора_Mirrormaker_1>,...,connectorSpec.connectorConfigMirrormaker.<настройка_коннектора_Mirrormaker_N>,connectorSpec.connectorConfigS3Sink.<настройка_коннектора_S3-Sink_1>,...,connectorSpec.connectorConfigS3Sink.<настройка_коннектора_S3-Sink_N>,connectorSpec.connectorConfigIcebergSink.<настройка_коннектора_IcebergSink_1>,...,connectorSpec.connectorConfigIcebergSink.<настройка_коннектора_IcebergSink_N>",
                "connectorSpec": {
                  "tasksMax": "<лимит_задач>"
                  "properties": "<дополнительные_свойства_коннектора>"
                  "connectorConfigMirrormaker": {
                    <настройки_коннектора_Mirrormaker>
                  },
                  "connectorConfigS3Sink": {
                    <настройки_коннектора_S3-Sink>
                  },
                  "connectorConfigIcebergSink": {
                     <настройки_коннектора_IcebergSink>
                   }
                }
              }'
    

    Где:

    • updateMask — перечень изменяемых параметров коннектора в одну строку через запятую.

      Укажите нужные параметры:

      • connectorSpec.tasksMax – если нужно изменить лимит задач коннектора.
      • connectorSpec.properties – если нужно изменить дополнительные свойства коннектора.
      • connectorSpec.connectorConfigMirrormaker.<настройка_конфигурации_коннектора_Mirrormaker> – если нужно изменить настройки коннектора Mirrormaker.
      • connectorSpec.connectorConfigS3Sink.<настройка_конфигурации_коннектора_S3-Sink> – если нужно изменить настройки коннектора S3 Sink.
      • connectorSpec.connectorConfigIcebergSink.<настройка_конфигурации_коннектора_IcebergSink> – если нужно изменить настройки коннектора Iceberg Sink.
    • connectorSpec – укажите настройки коннектора MirrorMaker, S3 Sink или Iceberg Sink.

    Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя коннектора — со списком коннекторов в кластере.

  3. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Клонируйте репозиторий cloudapi:

    cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
    

    Далее предполагается, что содержимое репозитория находится в директории ~/cloudapi/.

  3. Воспользуйтесь вызовом ConnectorService/Update и выполните запрос, например, с помощью gRPCurl:

    Важно

    Метод API переопределит все параметры изменяемого объекта, которые не были явно переданы в запросе, на значения по умолчанию. Чтобы избежать этого, перечислите настройки, которые вы хотите изменить, в параметре update_mask (в виде массива строк paths[]).

    Формат перечисления настроек
    "update_mask": {
        "paths": [
            "<настройка_1>",
            "<настройка_2>",
            ...
            "<настройка_N>"
        ]
    }
    
    grpcurl \
      -format json \
      -import-path ~/cloudapi/ \
      -import-path ~/cloudapi/third_party/googleapis/ \
      -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
      -rpc-header "Authorization: Bearer $IAM_TOKEN" \
      -d '{
            "cluster_id": "<идентификатор_кластера>",
            "connector_name": "<имя_коннектора>",
            "update_mask": {
              "paths": [
                "connector_spec.tasks_max",
                "connector_spec.properties",
                "connector_spec.connector_config_mirrormaker.<настройка_коннектора_Mirrormaker_1>",
                ...,
                "connector_spec.connector_config_mirrormaker.<настройка_коннектора_Mirrormaker_N>",
                "connector_spec.connector_config_s3_sink.<настройка_коннектора_S3-Sink_1>",
                ...,
                "connector_spec.connector_config_s3_sink.<настройка_коннектора_S3-Sink_N>",
                "connector_spec.connector_config_iceberg_sink.<настройка_коннектора_IcebergSink_1>",
                ...,
                "connector_spec.connector_config_iceberg_sink.<настройка_коннектора_IcebergSink_N>"
              ]
            },
            "connector_spec": {
              "tasks_max": {
                "value": "<лимит_задач>"
              },
              "properties": "<дополнительные_свойства_коннектора>"
              "connector_config_mirrormaker": {
                <настройки_коннектора_Mirrormaker>
              },
              "connector_config_s3_sink": {
                <настройки_коннектора_S3-Sink>
              },
              "connector_config_iceberg_sink": {
                <настройки_коннектора_IcebergSink>
              }
            }
          }' \
      mdb.api.cloud.yandex.net:443 \
      yandex.cloud.mdb.kafka.v1.ConnectorService.Update
    

    Где:

    • update_mask — перечень изменяемых параметров коннектора в виде массива строк paths[].

      Укажите нужные параметры:

      • connector_spec.tasks_max – если нужно изменить лимит задач коннектора.
      • connector_spec.properties – если нужно изменить дополнительные свойства коннектора.
      • connector_spec.connector_config_mirrormaker.<настройка_конфигурации_коннектора_Mirrormaker> – если нужно изменить настройки коннектора Mirrormaker.
      • connector_spec.connector_config_s3_sink.<настройка_конфигурации_коннектора_S3-Sink> – если нужно изменить настройки коннектора S3 Sink.
      • connector_spec.connector_config_iceberg_sink.<настройка_конфигурации_коннектора_IcebergSink> – если нужно изменить настройки коннектора Iceberg Sink.
    • connector_spec – укажите настройки коннектора MirrorMaker или S3 Sink.

    Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя коннектора — со списком коннекторов в кластере.

  4. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

Приостановить коннекторПриостановить коннектор

В процессе приостановки коннектора:

  • разрывается подключение к приемнику;
  • удаляются данные из служебных топиков коннектора.

Чтобы приостановить коннектор:

Консоль управления
CLI
REST API
gRPC API
  1. В консоли управления перейдите в нужный каталог.
  2. Перейдите в сервис Managed Service for Kafka.
  3. Выберите нужный кластер и перейдите на вкладку Коннекторы.
  4. Нажмите на значок рядом с именем нужного коннектора и выберите пункт Приостановить.

Если у вас еще нет интерфейса командной строки Yandex Cloud (CLI), установите и инициализируйте его.

По умолчанию используется каталог, указанный при создании профиля CLI. Чтобы изменить каталог по умолчанию, используйте команду yc config set folder-id <идентификатор_каталога>. Также для любой команды вы можете указать другой каталог с помощью параметров --folder-name или --folder-id. Если вы обращаетесь к ресурсу по имени, поиск будет выполнен в каталоге по умолчанию. Если вы обращаетесь к ресурсу по идентификатору, поиск будет выполнен глобально — во всех каталогах с учетом прав доступа.

Чтобы приостановить работу коннектора, выполните команду:

yc managed-kafka connector pause <имя_коннектора> \
   --cluster-name=<имя_кластера>
  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Воспользуйтесь методом Connector.pause и выполните запрос, например, с помощью cURL:

    curl \
      --request POST \
      --header "Authorization: Bearer $IAM_TOKEN" \
      --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/connectors/pause/<имя_коннектора>'
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя коннектора — со списком коннекторов в кластере.

  3. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Клонируйте репозиторий cloudapi:

    cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
    

    Далее предполагается, что содержимое репозитория находится в директории ~/cloudapi/.

  3. Воспользуйтесь вызовом ConnectorService/Pause и выполните запрос, например, с помощью gRPCurl:

    grpcurl \
      -format json \
      -import-path ~/cloudapi/ \
      -import-path ~/cloudapi/third_party/googleapis/ \
      -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
      -rpc-header "Authorization: Bearer $IAM_TOKEN" \
      -d '{
            "cluster_id": "<идентификатор_кластера>",
            "connector_name": "<имя_коннектора>"
          }' \
      mdb.api.cloud.yandex.net:443 \
      yandex.cloud.mdb.kafka.v1.ConnectorService.Pause
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя коннектора — со списком коннекторов в кластере.

  4. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

Возобновить работу коннектораВозобновить работу коннектора

Консоль управления
CLI
REST API
gRPC API
  1. В консоли управления перейдите в нужный каталог.
  2. Перейдите в сервис Managed Service for Kafka.
  3. Выберите нужный кластер и перейдите на вкладку Коннекторы.
  4. Нажмите на значок рядом с именем нужного коннектора и выберите пункт Возобновить.

Если у вас еще нет интерфейса командной строки Yandex Cloud (CLI), установите и инициализируйте его.

По умолчанию используется каталог, указанный при создании профиля CLI. Чтобы изменить каталог по умолчанию, используйте команду yc config set folder-id <идентификатор_каталога>. Также для любой команды вы можете указать другой каталог с помощью параметров --folder-name или --folder-id. Если вы обращаетесь к ресурсу по имени, поиск будет выполнен в каталоге по умолчанию. Если вы обращаетесь к ресурсу по идентификатору, поиск будет выполнен глобально — во всех каталогах с учетом прав доступа.

Чтобы возобновить работу коннектора, выполните команду:

yc managed-kafka connector resume <имя_коннектора> \
   --cluster-name=<имя_кластера>
  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Воспользуйтесь методом Connector.pause и выполните запрос, например, с помощью cURL:

    curl \
      --request POST \
      --header "Authorization: Bearer $IAM_TOKEN" \
      --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/connectors/resume/<имя_коннектора>'
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя коннектора — со списком коннекторов в кластере.

  3. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Клонируйте репозиторий cloudapi:

    cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
    

    Далее предполагается, что содержимое репозитория находится в директории ~/cloudapi/.

  3. Воспользуйтесь вызовом ConnectorService/Resume и выполните запрос, например, с помощью gRPCurl:

    grpcurl \
      -format json \
      -import-path ~/cloudapi/ \
      -import-path ~/cloudapi/third_party/googleapis/ \
      -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
      -rpc-header "Authorization: Bearer $IAM_TOKEN" \
      -d '{
            "cluster_id": "<идентификатор_кластера>",
            "connector_name": "<имя_коннектора>"
          }' \
      mdb.api.cloud.yandex.net:443 \
      yandex.cloud.mdb.kafka.v1.ConnectorService.Resume
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя коннектора — со списком коннекторов в кластере.

  4. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

Импортировать коннектор в TerraformИмпортировать коннектор в Terraform

С помощью импорта вы можете передать существующие коннекторы под управление Terraform.

Terraform
  1. Укажите в конфигурационном файле Terraform коннектор, который необходимо импортировать:

    resource "yandex_mdb_kafka_cluster" "<имя_коннектора>" {}
    
  2. Выполните команду для импорта коннектора:

    terraform import yandex_mdb_kafka_connector.<имя_коннектора> <идентификатор_кластера>:<имя_коннектора>
    

    Подробнее об импорте коннекторов в документации провайдера Terraform.

Удалить коннекторУдалить коннектор

Консоль управления
CLI
Terraform
REST API
gRPC API
  1. В консоли управления перейдите в нужный каталог.
  2. Перейдите в сервис Managed Service for Kafka.
  3. Выберите нужный кластер и перейдите на вкладку Коннекторы.
  4. Нажмите на значок рядом с именем нужного коннектора и выберите пункт Удалить.
  5. Нажмите кнопку Удалить.

Если у вас еще нет интерфейса командной строки Yandex Cloud (CLI), установите и инициализируйте его.

По умолчанию используется каталог, указанный при создании профиля CLI. Чтобы изменить каталог по умолчанию, используйте команду yc config set folder-id <идентификатор_каталога>. Также для любой команды вы можете указать другой каталог с помощью параметров --folder-name или --folder-id. Если вы обращаетесь к ресурсу по имени, поиск будет выполнен в каталоге по умолчанию. Если вы обращаетесь к ресурсу по идентификатору, поиск будет выполнен глобально — во всех каталогах с учетом прав доступа.

Чтобы удалить коннектор, выполните команду:

yc managed-kafka connector delete <имя_коннектора> \
   --cluster-name <имя_кластера>
  1. Откройте актуальный конфигурационный файл Terraform с планом инфраструктуры.

    Как создать такой файл, описано в разделе Создание кластера Apache Kafka®.

  2. Удалите ресурс yandex_mdb_kafka_connector с описанием нужного коннектора.

  3. Проверьте корректность настроек.

    1. В командной строке перейдите в каталог, в котором расположены актуальные конфигурационные файлы Terraform с планом инфраструктуры.

    2. Выполните команду:

      terraform validate
      

      Если в файлах конфигурации есть ошибки, Terraform на них укажет.

  4. Подтвердите изменение ресурсов.

    1. Выполните команду для просмотра планируемых изменений:

      terraform plan
      

      Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.

    2. Если вас устраивают планируемые изменения, внесите их:

      1. Выполните команду:

        terraform apply
        
      2. Подтвердите изменение ресурсов.

      3. Дождитесь завершения операции.

Подробнее в документации провайдера Terraform.

  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Воспользуйтесь методом Connector.pause и выполните запрос, например, с помощью cURL:

    curl \
      --request DELETE \
      --header "Authorization: Bearer $IAM_TOKEN" \
      --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<идентификатор_кластера>/connectors/<имя_коннектора>'
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя коннектора — со списком коннекторов в кластере.

  3. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

  1. Получите IAM-токен для аутентификации в API и поместите токен в переменную среды окружения:

    export IAM_TOKEN="<IAM-токен>"
    
  2. Клонируйте репозиторий cloudapi:

    cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
    

    Далее предполагается, что содержимое репозитория находится в директории ~/cloudapi/.

  3. Воспользуйтесь вызовом ConnectorService/Delete и выполните запрос, например, с помощью gRPCurl:

    grpcurl \
      -format json \
      -import-path ~/cloudapi/ \
      -import-path ~/cloudapi/third_party/googleapis/ \
      -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
      -rpc-header "Authorization: Bearer $IAM_TOKEN" \
      -d '{
            "cluster_id": "<идентификатор_кластера>",
            "connector_name": "<имя_коннектора>"
          }' \
      mdb.api.cloud.yandex.net:443 \
      yandex.cloud.mdb.kafka.v1.ConnectorService.Delete
    

    Идентификатор кластера можно запросить со списком кластеров в каталоге, а имя коннектора — со списком коннекторов в кластере.

  4. Убедитесь, что запрос был выполнен успешно, изучив ответ сервера.

Была ли статья полезна?

Предыдущая
Управление пользователями
Следующая
Веб-интерфейс Kafka UI для Apache Kafka®
Создавайте контент и получайте гранты!Готовы написать своё руководство? Участвуйте в контент-программе и получайте гранты на работу с облачными сервисами!
Подробнее о программе
Проект Яндекса
© 2026 ООО «Яндекс.Облако»