Управление коннекторами
Коннектор управляет процессом переноса топиков Apache Kafka® в другой кластер или другую систему хранения данных.
Вы можете:
- получить список коннекторов;
- получить детальную информацию о коннекторе;
- создать коннектор нужного типа:
- изменить коннектор;
- приостановить коннектор;
- возобновить работу коннектора;
- импортировать коннектор в Terraform;
- удалить коннектор.
Получить список коннекторов
- В консоли управления
перейдите в нужный каталог. - В списке сервисов выберите Managed Service for Kafka.
- Выберите нужный кластер и перейдите на вкладку Коннекторы.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы запросить список коннекторов кластера, выполните команду:
yc managed-kafka connector list --cluster-name=<имя_кластера>
Результат:
+--------------+-----------+
| NAME | TASKS MAX |
+--------------+-----------+
| connector559 | 1 |
| ... | |
+--------------+-----------+
Имя кластера можно получить со списком кластеров в каталоге.
Чтобы получить список коннекторов, воспользуйтесь методом REST API list для ресурса Connector или вызовом gRPC API ConnectorService/List и передайте в запросе идентификатор кластера в параметре clusterId
.
Чтобы узнать идентификатор кластера, получите список кластеров в каталоге.
Получить детальную информацию о коннекторе
- В консоли управления
перейдите в нужный каталог. - В списке сервисов выберите Managed Service for Kafka.
- Выберите нужный кластер и перейдите на вкладку Коннекторы.
- Нажмите на имя нужного коннектора.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы получить детальную информацию о коннекторе, выполните команду:
yc managed-kafka connector get <имя_коннектора>\
--cluster-name=<имя_кластера>
Результат:
name: connector785
tasks_max: "1"
cluster_id: c9qbkmoiimsl********
...
Имя коннектора можно запросить со списком коннекторов в кластере, имя кластера — со списком кластеров в каталоге.
Чтобы получить детальную информацию о коннекторе, воспользуйтесь методом REST API get для ресурса Connector или вызовом gRPC API ConnectorService/Get и передайте в запросе:
- Идентификатор кластера в параметре
clusterId
. Чтобы узнать идентификатор, получите список кластеров в каталоге. - Имя коннектора в параметре
connectorName
. Чтобы узнать имя, получите список коннекторов в кластере.
Создать коннектор
-
В консоли управления
перейдите в нужный каталог. -
В списке сервисов выберите Managed Service for Kafka.
-
Выберите нужный кластер и перейдите на вкладку Коннекторы.
-
Нажмите кнопку Создать коннектор.
-
В блоке Базовые параметры укажите:
- Имя коннектора.
- Лимит задач — количество одновременно работающих процессов. Рекомендуется указывать не менее
2
для равномерного распределения нагрузки репликации.
-
В блоке Дополнительные свойства укажите свойства коннектора в формате:
<ключ>:<значение>
При этом ключ может быть как простой строкой, так и содержать префикс, указывающий на принадлежность к источнику или приемнику (псевдоним кластера в конфигурации коннектора):
<псевдоним_кластера>.<тело_ключа>:<значение>
-
Выберите тип коннектора — MirrorMaker или S3 Sink — и задайте его конфигурацию.
Подробнее о поддерживаемых типах коннекторов см. в разделе Коннекторы.
-
Нажмите кнопку Создать.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы создать коннектор MirrorMaker:
-
Посмотрите описание команды CLI для создания коннектора:
yc managed-kafka connector-mirrormaker create --help
-
Создайте коннектор:
yc managed-kafka connector-mirrormaker create <имя_коннектора> \ --cluster-name=<имя_кластера> \ --direction=<направление_коннектора> \ --tasks-max=<лимит_задач> \ --properties=<дополнительные_свойства> \ --replication-factor=<фактор_репликации> \ --topics=<шаблон_для_топиков> \ --this-cluster-alias=<префикс_для_обозначения_этого_кластера> \ --external-cluster alias=<префикс_для_обозначения_внешнего_кластера>,` `bootstrap-servers=<список_FQDN_хостов-брокеров>,` `security-protocol=<протокол_безопасности>,` `sasl-mechanism=<механизм_шифрования>,` `sasl-username=<имя_пользователя>,` `sasl-password=<пароль_пользователя>,` `ssl-truststore-certificates=<сертификаты_в_формате_PEM>
Как получить FQDN хоста-брокера, см. в инструкции.
Имя кластера можно получить со списком кластеров в каталоге.
Параметр
--direction
принимает значение:egress
— если текущий кластер является кластером-источником.ingress
— если текущий кластер является кластером-приемником.
Чтобы создать коннектор S3 Sink:
-
Посмотрите описание команды CLI для создания коннектора:
yc managed-kafka connector-s3-sink create --help
-
Создайте коннектор:
yc managed-kafka connector-s3-sink create <имя_коннектора> \ --cluster-name=<имя_кластера> \ --tasks-max=<лимит_задач> \ --properties=<дополнительные_свойства> \ --topics=<шаблон_для_топиков> \ --file-compression-type=<кодек_сжатия> \ --file-max-records=<максимальное_количество_сообщений_в_файле> \ --bucket-name=<имя_бакета> \ --access-key-id=<идентификатор_AWS-совместимого_статического_ключа> \ --secret-access-key=<содержимое_AWS-совместимого_статического_ключа> \ --storage-endpoint=<эндпоинт_S3-совместимого_хранилища> \ --region=<регион_S3-совместимого_хранилища>
Имя кластера можно получить со списком кластеров в каталоге.
-
Ознакомьтесь со списком настроек коннекторов MirrorMaker и S3 Sink.
-
Откройте актуальный конфигурационный файл Terraform с планом инфраструктуры.
О том, как создать такой файл, см. в разделе Создание кластера.
-
Чтобы создать коннектор MirrorMaker, добавьте ресурс
yandex_mdb_kafka_connector
с блоком настроекconnector_config_mirrormaker
:resource "yandex_mdb_kafka_connector" "<имя_коннектора>" { cluster_id = "<идентификатор_кластера>" name = "<имя_коннектора>" tasks_max = <лимит_задач> properties = { <дополнительные_свойства> } connector_config_mirrormaker { topics = "<шаблон_для_топиков>" replication_factor = <фактор_репликации> source_cluster { alias = "<префикс_для_обозначения_кластера>" external_cluster { bootstrap_servers = "<список_FQDN_хостов-брокеров>" sasl_username = "<имя_пользователя>" sasl_password = "<пароль_пользователя>" sasl_mechanism = "<механизм_шифрования>" security_protocol = "<протокол_безопасности>" ssl-truststore-certificates = "<содержимое_PEM-сертификата>" } } target_cluster { alias = "<префикс_для_обозначения_кластера>" this_cluster {} } } }
Как получить FQDN хоста-брокера, см. в инструкции.
-
Чтобы создать коннектор S3 Sink, добавьте ресурс
yandex_mdb_kafka_connector
с блоком настроекconnector_config_s3_sink
:resource "yandex_mdb_kafka_connector" "<имя_коннектора>" { cluster_id = "<идентификатор_кластера>" name = "<имя_коннектора>" tasks_max = <лимит_задач> properties = { <дополнительные_свойства> } connector_config_s3_sink { topics = "<шаблон_для_топиков>" file_compression_type = "<кодек_сжатия>" file_max_records = <максимальное_количество_сообщений_в_файле> s3_connection { bucket_name = "<имя_бакета>" external_s3 { endpoint = "<эндпоинт_S3-совместимого_хранилища>" access_key_id = "<идентификатор_AWS-совместимого_статического_ключа>" secret_access_key = "<содержимое_AWS-совместимого_статического_ключа>" } } } }
-
Проверьте корректность настроек.
-
В командной строке перейдите в каталог, в котором расположены актуальные конфигурационные файлы Terraform с планом инфраструктуры.
-
Выполните команду:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
-
Подробнее см. в документации провайдера Terraform
Чтобы создать коннектор, воспользуйтесь методом API create для ресурса Connector или вызовом gRPC API ConnectorService/Create и передайте в запросе:
- Идентификатор кластера, в котором нужно создать коннектор, в параметре
clusterId
. Чтобы узнать идентификатор, получите список кластеров в каталоге. - Настройки коннектора MirrorMaker или S3 Sink в параметре
connectorSpec
.
MirrorMaker
Укажите параметры коннектора MirrorMaker:
-
Топики — шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ
|
. Можно использовать выражение.*
, напримерanalysis.*
. Для переноса всех топиков укажите.*
. -
Фактор репликации — количество копий топика, хранящихся в кластере.
-
В блоке Кластер-источник укажите параметры для подключения к кластеру-источнику:
-
Псевдоним — префикс для обозначения кластера-источника в настройках коннектора.
Примечание
Топики в кластере-приемнике будут созданы с указанным префиксом.
-
Использовать этот кластер — выберите опцию для использования текущего кластера в качестве источника.
-
Бутстрап-серверы — список FQDN хостов-брокеров кластера-источника с номерами портов для подключения, разделенный запятыми. Например:
broker1.example.com:9091,broker2.example.com
.Как получить FQDN хоста-брокера, см. в инструкции.
-
SASL имя пользователя — имя пользователя для подключения коннектора к кластеру-источнику.
-
SASL пароль — пароль пользователя для подключения коннектора к кластеру-источнику.
-
SASL механизм — выберите механизм шифрования имени и пароля.
-
Протокол безопасности — выберите протокол подключения коннектора:
PLAINTEXT
,SASL_PLAINTEXT
– для подключений без SSL;SSL
,SASL_SSL
– для подключений с SSL.
-
Сертификат в формате PEM — загрузите PEM-сертификат для доступа к внешнему кластеру.
-
-
В блоке Кластер-приёмник укажите параметры для подключения к кластеру-приемнику:
-
Псевдоним — префикс для обозначения кластера-приемника в настройках коннектора.
-
Использовать этот кластер — выберите опцию для использования текущего кластера в качестве приемника.
-
Бутстрап-серверы — список FQDN хостов-брокеров кластера-приемника с номерами портов для подключения, разделенный запятыми.
Как получить FQDN хоста-брокера, см. в инструкции.
-
SASL имя пользователя — имя пользователя для подключения коннектора к кластеру-приемнику.
-
SASL пароль — пароль пользователя для подключения коннектора к кластеру-приемнику.
-
SASL механизм — выберите механизм шифрования имени и пароля.
-
Протокол безопасности — выберите протокол подключения коннектора:
PLAINTEXT
,SASL_PLAINTEXT
– для подключений без SSL;SSL
,SASL_SSL
– для подключений с SSL.
-
Сертификат в формате PEM — загрузите PEM-сертификат для доступа к внешнему кластеру.
-
-
Чтобы задать значения дополнительных настроек, не указанных в этом списке, создайте необходимые ключи и задайте их значения в блоке Дополнительные свойства при создании или изменении коннектора. Примеры ключей:
key.converter
value.converter
Список общих настроек коннекторов см. в документации Apache Kafka®
.
-
--cluster-name
— имя кластера. -
--direction
— направление коннектора:ingress
— если кластер является приемником.egress
— если кластер является источником.
-
--tasks-max
— количество одновременно работающих процессов. Рекомендуется указывать не менее2
для равномерного распределения нагрузки репликации. -
--properties
— список дополнительных настроек коннектора в формате<ключ>:<значение>
, разделенный запятыми. Примеры ключей:key.converter
value.converter
Список общих настроек коннекторов см. в документации Apache Kafka®
. -
--replication-factor
— количество копий топика, хранящихся в кластере. -
--topics
— шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ|
. Можно использовать выражение.*
, напримерanalysis.*
. Для переноса всех топиков укажите.*
. -
--this-cluster-alias
— префикс для обозначения этого кластера в настройках коннектора. -
--external-cluster
— параметры внешнего кластера:-
alias
— префикс для обозначения внешнего кластера в настройках коннектора. -
bootstrap-servers
— список FQDN хостов-брокеров внешнего кластера с номерами портов для подключения, разделенный запятыми.Как получить FQDN хоста-брокера, см. в инструкции.
-
security-protocol
— протокол подключения коннектора:plaintext
,sasl_plaintext
– для подключений без SSL;ssl
,sasl_ssl
– для подключений с SSL.
-
sasl-mechanism
— механизм шифрования имени и пароля. -
sasl-username
— имя пользователя для подключения коннектора к внешнему кластеру. -
sasl-password
— пароль пользователя для подключения коннектора к внешнему кластеру. -
ssl-truststore-certificates
— список сертификатов в формате PEM.
-
-
properties — список дополнительных настроек коннектора в формате
<ключ>:<значение>
, разделенный запятыми. Примеры ключей:key.converter
value.converter
Список общих настроек коннекторов см. в документации Apache Kafka®
. -
topics — шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ
|
. Можно использовать выражение.*
, напримерanalysis.*
. Для переноса всех топиков укажите.*
. -
replication_factor — количество копий топика, хранящихся в кластере.
-
source_cluster и target_cluster — параметры для подключения к кластеру-источнику и кластеру-приемнику:
-
alias — префикс для обозначения кластера в настройках коннектора.
Примечание
Топики в кластере-приемнике будут созданы с указанным префиксом.
-
this_cluster — опция для использования текущего кластера в качестве источника или приемника.
-
external_cluster — параметры для подключения к внешнему кластеру:
-
bootstrap_servers — список FQDN хостов-брокеров кластера с номерами портов для подключения, разделенный запятыми.
Как получить FQDN хоста-брокера, см. в инструкции.
-
sasl_username — имя пользователя для подключения коннектора к кластеру.
-
sasl_password — пароль пользователя для подключения коннектора к кластеру.
-
sasl_mechanism — механизм шифрования имени и пароля.
-
security_protocol — протокол подключения коннектора:
PLAINTEXT
,SASL_PLAINTEXT
– для подключений без SSL;SSL
,SASL_SSL
– для подключений с SSL.
-
ssl_truststore_certificates — содержимое PEM-сертификата.
-
-
Примечание
Названия параметров приведены для REST API, а в скобках — для gRPC API, если название отличается.
Настройки коннектора MirrorMaker задаются в параметре connectorConfigMirrormaker
(connector_config_mirrormaker
):
-
sourceCluster
(source_cluster
) иtargetCluster
(target_cluster
) — параметры для подключения к кластеру-источнику и кластеру-приемнику:-
alias
— префикс для обозначения кластера в настройках коннектора.Примечание
Топики в кластере-приемнике будут созданы с указанным префиксом.
-
thisCluster
(this_cluster
) — опция для использования текущего кластера в качестве источника или приемника. -
externalCluster
(external_cluster
) — параметры для подключения к внешнему кластеру:-
bootstrapServers
(bootstrap_servers
) — список FQDN хостов-брокеров кластера с номерами портов для подключения, разделенный запятыми.Как получить FQDN хоста-брокера, см. в инструкции.
-
saslUsername
(sasl_username
) — имя пользователя для подключения коннектора к кластеру. -
saslPassword
(sasl_password
) — пароль пользователя для подключения коннектора к кластеру. -
saslMechanism
(sasl_mechanism
) — механизм шифрования имени и пароля. -
securityProtocol
(security_protocol
) — протокол подключения коннектора:PLAINTEXT
,SASL_PLAINTEXT
– для подключений без SSL;SSL
,SASL_SSL
– для подключений с SSL.
-
sslTruststoreCertificates
(ssl_truststore_certificates
) — содержимое PEM-сертификата.
-
-
-
topics
— шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ|
. Можно использовать выражение.*
, напримерanalysis.*
. Для переноса всех топиков укажите.*
. -
replicationFactor
(replication_factor
) — количество копий топика, хранящихся в кластере.
S3 Sink
Укажите параметры коннектора S3 Sink:
-
Топики — шаблон для отбора экспортируемых топиков, имена топиков перечисляются через запятую или символ
|
. Можно использовать выражение.*
, напримерanalysis.*
. Для переноса всех топиков укажите.*
. -
Механизм сжатия — выберите кодек для сжатия сообщений:
none
(по умолчанию) — сжатие отсутствует;gzip
— кодек gzip ;snappy
— кодек snappy ;zstd
— кодек zstd .
После создания кластера данный параметр нельзя изменить.
-
(Опционально) Максимальное количество записей на файл — максимальное количество записей, которое может быть записано в один файл, размещенный в S3-совместимом хранилище.
-
В блоке Подключение к S3 укажите параметры подключения к хранилищу:
-
Имя бакета — имя бакета хранилища.
-
Эндпоинт — эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища).
-
(Опционально) Регион — название региона. Значение по умолчанию —
us-east-1
. Список доступных регионов . -
(Опционально) Идентификатор ключа доступа, Секретный ключ — идентификатор и содержимое AWS-совместимого ключа.
-
-
Чтобы задать значения дополнительных настроек, не указанных в этом списке, создайте необходимые ключи и задайте их значения в блоке Дополнительные свойства при создании или изменении коннектора. Примеры ключей:
key.converter
value.converter
value.converter.schemas.enable
format.output.type
Список всех настроек коннектора см. в документации коннектора
. Список общих настроек коннекторов см. в документации Apache Kafka® .
-
--cluster-name
— имя кластера. -
--tasks-max
— количество одновременно работающих процессов. Рекомендуется указывать не менее2
для равномерного распределения нагрузки репликации. -
--properties
— список дополнительных настроек коннектора в формате<ключ>:<значение>
, разделенный запятыми. Примеры ключей:key.converter
value.converter
value.converter.schemas.enable
format.output.type
Список всех настроек коннектора см. в документации коннектора
. Список общих настроек коннекторов см. в документации Apache Kafka® . -
--topics
— шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ|
. Можно использовать выражение.*
, напримерanalysis.*
. Для переноса всех топиков укажите.*
. -
--file-compression-type
— кодек для сжатия сообщений. После создания кластера данный параметр нельзя изменить. Допустимые значения: -
--file-max-records
— максимальное количество записей, которое может быть записано в один файл, размещенный в S3-совместимом хранилище. -
--bucket-name
— имя бакета в S3-совместимом хранилище, в который будет производиться запись. -
--storage-endpoint
— эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища). Пример:storage.yandexcloud.kz
. -
--region
— регион, в котором находится бакет S3-совместимого хранилища. Значение по умолчанию —us-east-1
. Список доступных регионов . -
--access-key-id
,--secret-access-key
— идентификатор и содержимое AWS-совместимого ключа.
-
properties — список дополнительных настроек коннектора в формате
<ключ>:<значение>
, разделенный запятыми. Примеры ключей:key.converter
value.converter
value.converter.schemas.enable
format.output.type
Список всех настроек коннектора см. в документации коннектора
. Список общих настроек коннекторов см. в документации Apache Kafka® . -
topics — шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ
|
. Можно использовать выражение.*
, напримерanalysis.*
. Для переноса всех топиков укажите.*
. -
file_compression_type — кодек для сжатия сообщений. После создания кластера данный параметр нельзя изменить. Допустимые значения:
-
file_max_records — максимальное количество записей, которое может быть записано в один файл, размещенный в S3-совместимом хранилище.
-
s3_connection — параметры для подключения к S3-совместимому хранилищу:
-
bucket_name — имя бакета, в который будет производиться запись.
-
external_s3 — параметры для подключения к внешнему S3-совместимому хранилищу:
-
endpoint — эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища). Пример:
storage.yandexcloud.kz
. -
region — регион, в котором находится бакет S3-совместимого хранилища. Значение по умолчанию —
us-east-1
. Список доступных регионов . -
access_key_id, secret_access_key — идентификатор и содержимое AWS-совместимого ключа.
-
-
Примечание
Названия параметров приведены для REST API, а в скобках — для gRPC API, если название отличается.
Настройки коннектора S3 Sink задаются в параметре connectorConfigS3Sink
(connector_config_s3_sink
):
-
topics
— шаблон для отбора реплицируемых топиков, имена топиков перечисляются через запятую или символ|
. Можно использовать выражение.*
, напримерanalysis.*
. Для переноса всех топиков укажите.*
. -
fileCompressionType
(file_compression_type
) — кодек для сжатия сообщений. После создания кластера данный параметр нельзя изменить. Допустимые значения: -
fileMaxRecords
(file_max_records
) — максимальное количество записей, которое может быть записано в один файл, размещенный в S3-совместимом хранилище. -
s3Connection
(s3_connection
) — параметры для подключения к S3-совместимому хранилищу:bucketName
(bucket_name
) — имя бакета, в который будет производиться запись.externalS3
(external_s3
) — параметры внешнего хранилища:-
endpoint
— эндпоинт для доступа к хранилищу (его необходимо узнать у провайдера хранилища). Пример:storage.yandexcloud.kz
. -
region
— регион, в котором находится бакет S3-совместимого хранилища. Значение по умолчанию —us-east-1
. Список доступных регионов . -
accessKeyId
(access_key_id
),secretAccessKey
(secret_access_key
) — идентификатор и содержимое AWS-совместимого ключа.
-
Изменить коннектор
- В консоли управления
перейдите в нужный каталог. - В списке сервисов выберите Managed Service for Kafka.
- Выберите нужный кластер и перейдите на вкладку Коннекторы.
- В строке с нужным коннектором нажмите на значок
и выберите пункт Изменить коннектор. - Внесите необходимые изменения в свойства коннектора.
- Нажмите кнопку Сохранить.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы изменить коннектор MirrorMaker:
-
Посмотрите описание команды CLI для изменения коннектора:
yc managed-kafka connector-mirrormaker update --help
-
Запустите операцию, например, изменения лимита задач:
yc managed-kafka connector-mirrormaker update <имя_коннектора> \ --cluster-name=<имя_кластера> \ --direction=<направление_коннектора> \ --tasks-max=<новый_лимит_задач>
Где
--direction
— направление коннектора:ingress
илиegres
.Имя коннектора можно запросить со списком коннекторов в кластере, имя кластера — со списком кластеров в каталоге.
Чтобы изменить коннектор S3 Sink:
-
Посмотрите описание команды CLI для изменения коннектора:
yc managed-kafka connector-s3-sink update --help
-
Запустите операцию, например, изменения лимита задач:
yc managed-kafka connector-s3-sink update <имя_коннектора> \ --cluster-name=<имя_кластера> \ --tasks-max=<новый_лимит_задач>
Имя коннектора можно запросить со списком коннекторов в кластере, имя кластера — со списком кластеров в каталоге.
-
Ознакомьтесь со списком настроек коннекторов MirrorMaker и S3 Sink.
-
Откройте актуальный конфигурационный файл Terraform с планом инфраструктуры.
О том, как создать такой файл, см. в разделе Создание кластера.
-
Измените значение параметров в описании ресурса
yandex_mdb_kafka_connector
:-
Для коннектора MirrorMaker:
resource "yandex_mdb_kafka_connector" "<имя_коннектора>" { cluster_id = "<идентификатор_кластера>" name = "<имя_коннектора>" tasks_max = <лимит_задач> properties = { <дополнительные_свойства> } connector_config_mirrormaker { topics = "<шаблон_для_топиков>" replication_factor = <фактор_репликации> source_cluster { alias = "<префикс_для_обозначения_кластера>" external_cluster { bootstrap_servers = "<список_FQDN_хостов-брокеров>" sasl_username = "<имя_пользователя>" sasl_password = "<пароль_пользователя>" sasl_mechanism = "<механизм_шифрования>" security_protocol = "<протокол_безопасности>" ssl-truststore-certificates = "<содержимое_PEM-сертификата>" } } target_cluster { alias = "<префикс_для_обозначения_кластера>" this_cluster {} } } }
-
Для коннектора S3 Sink:
resource "yandex_mdb_kafka_connector" "<имя_S3_Sink_коннектора>" { cluster_id = "<идентификатор_кластера>" name = "<имя_S3_Sink_коннектора>" tasks_max = <лимит_задач> properties = { <дополнительные_свойства> } connector_config_s3_sink { topics = "<шаблон_для_топиков>" file_max_records = <максимальное_количество_сообщений_в_файле> s3_connection { bucket_name = "<имя_бакета>" external_s3 { endpoint = "<эндпоинт_S3-совместимого_хранилища>" access_key_id = "<идентификатор_AWS-совместимого_статического_ключа>" secret_access_key = "<содержимое_AWS-совместимого_статического_ключа>" } } } }
-
-
Проверьте корректность настроек.
-
В командной строке перейдите в каталог, в котором расположены актуальные конфигурационные файлы Terraform с планом инфраструктуры.
-
Выполните команду:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
-
Подробнее см. в документации провайдера Terraform
Чтобы изменить коннектор, воспользуйтесь методом REST API update для ресурса Connector или вызовом gRPC API ConnectorService/Update и передайте в запросе:
- Идентификатор кластера, в котором нужно изменить коннектор, в параметре
clusterId
. Чтобы узнать идентификатор, получите список кластеров в каталоге. - Настройки коннектора MirrorMaker или S3 Sink в параметре
connectorSpec
.
Приостановить коннектор
В процессе приостановки коннектора:
- разрывается подключение к приемнику;
- удаляются данные из служебных топиков коннектора.
Чтобы приостановить коннектор:
- В консоли управления
перейдите в нужный каталог. - В списке сервисов выберите Managed Service for Kafka.
- Выберите нужный кластер и перейдите на вкладку Коннекторы.
- Нажмите на значок
рядом с именем нужного коннектора и выберите пункт Приостановить.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы приостановить работу коннектора, выполните команду:
yc managed-kafka connector pause <имя_коннектора> \
--cluster-name=<имя_кластера>
Чтобы приостановить работу коннектора, воспользуйтесь методом REST API pause для ресурса Connector или вызовом gRPC API ConnectorService/Pause и передайте в запросе:
- Идентификатор кластера в параметре
clusterId
. Чтобы узнать идентификатор, получите список кластеров в каталоге. - Имя коннектора в параметре
connectorName
. Чтобы узнать имя, получите список коннекторов в кластере.
Возобновить работу коннектора
- В консоли управления
перейдите в нужный каталог. - В списке сервисов выберите Managed Service for Kafka.
- Выберите нужный кластер и перейдите на вкладку Коннекторы.
- Нажмите на значок
рядом с именем нужного коннектора и выберите пункт Возобновить.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы возобновить работу коннектора, выполните команду:
yc managed-kafka connector resume <имя_коннектора> \
--cluster-name=<имя_кластера>
Чтобы возобновить работу коннектора, воспользуйтесь методом REST API resume для ресурса Connector или вызовом gRPC API ConnectorService/Resume и передайте в запросе:
- Идентификатор кластера в параметре
clusterId
. Чтобы узнать идентификатор, получите список кластеров в каталоге. - Имя коннектора в параметре
connectorName
. Чтобы узнать имя, получите список коннекторов в кластере.
Импортировать коннектор в Terraform
С помощью импорта вы можете передать существующие коннекторы под управление Terraform.
-
Укажите в конфигурационном файле Terraform коннектор, который необходимо импортировать:
resource "yandex_mdb_kafka_cluster" "<имя_коннектора>" {}
-
Выполните команду для импорта коннектора:
terraform import yandex_mdb_kafka_connector.<имя_коннектора> <идентификатор_кластера>:<имя_коннектора>
Подробнее об импорте коннекторов см. в документации провайдера Terraform
.
Удалить коннектор
- В консоли управления
перейдите в нужный каталог. - В списке сервисов выберите Managed Service for Kafka.
- Выберите нужный кластер и перейдите на вкладку Коннекторы.
- Нажмите на значок
рядом с именем нужного коннектора и выберите пункт Удалить. - Нажмите кнопку Удалить.
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра --folder-name
или --folder-id
.
Чтобы удалить коннектор, выполните команду:
yc managed-kafka connector delete <имя_коннектора> \
--cluster-name <имя_кластера>
-
Откройте актуальный конфигурационный файл Terraform с планом инфраструктуры.
О том, как создать такой файл, см. в разделе Создание кластера.
-
Удалите ресурс
yandex_mdb_kafka_connector
с описанием нужного коннектора. -
Проверьте корректность настроек.
-
В командной строке перейдите в каталог, в котором расположены актуальные конфигурационные файлы Terraform с планом инфраструктуры.
-
Выполните команду:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
-
Подробнее см. в документации провайдера Terraform
Чтобы удалить коннектор, воспользуйтесь методом REST API delete для ресурса Connector или вызовом gRPC API ConnectorService/Delete и передайте в запросе:
- Идентификатор кластера в параметре
clusterId
. Чтобы узнать идентификатор, получите список кластеров в каталоге. - Имя коннектора в параметре
connectorName
. Чтобы узнать имя, получите список коннекторов в кластере.