Передача данных в эндпоинт-приемник Apache Kafka®
С помощью сервиса Yandex Data Transfer вы можете переносить данные в очередь Apache Kafka® и реализовывать различные сценарии обработки и трансформации данных. Для реализации трансфера:
- Ознакомьтесь с возможными сценариями передачи данных.
- Настройте один из поддерживаемых источников данных.
- Настройте эндпоинт-приемник в Yandex Data Transfer.
- Создайте и запустите трансфер.
- Выполняйте необходимые действия по работе с базой и контролируйте трансфер.
- При возникновении проблем, воспользуйтесь готовыми решениями по их устранению.
Сценарии передачи данных в Apache Kafka®
-
Миграция — перенос данных из одного хранилища в другое. Часто это перенос базы из устаревших локальных баз в управляемые облачные.
Отдельной задачей миграции является зеркалирование данных между очередями:
-
Захват изменений данных — это процесс отслеживания изменений в базе данных и поставка этих изменений потребителям. Применяется для приложений, которые чувствительны к изменению данных в реальном времени.
-
Поставка данных — процесс доставки произвольных данных в целевые хранилища. Процесс поставки включает извлечение данных из очереди и их десериализацию с последующей трансформацией данных в формат целевого хранилища.
Подробное описание возможных сценариев передачи данных в Yandex Data Transfer см. в разделе Практические руководства.
Настройка источника данных
Настройте один из поддерживаемых источников данных:
Полный список поддерживаемых источников и приемников в Yandex Data Transfer см. в разделе Доступные трансферы.
Настройка эндпоинта-приемника Apache Kafka®
При создании или изменении эндпоинта вы можете задать:
- Настройки подключения к кластеру Yandex Managed Service for Apache Kafka® или пользовательской инсталляции и настройки сериализации, в т. ч. на базе виртуальных машин Yandex Compute Cloud. Это обязательные параметры.
- Настройки топика Apache Kafka.
Кластер Managed Service for Apache Kafka®
Важно
Для создания или редактирования эндпоинта управляемой базы данных вам потребуется роль managed-kafka.viewer или примитивная роль viewer, выданная на каталог кластера этой управляемой базы данных.
Подключение с указанием кластера в Yandex Cloud.
-
Тип подключения — выберите вариант подключения к кластеру:
-
Ручная настройка — позволяет задать настройки подключения вручную.
Выберите тип инсталляции Кластер Managed Service for Apache Kafka и задайте настройки:
-
Кластер Managed Service for Apache Kafka — выберите кластер, к которому необходимо подключиться.
-
Аутентификация — выберите тип:
SASLилиБез аутентификации.При выборе значения
SASL:- Имя пользователя — укажите имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
- Пароль — укажите пароль учетной записи.
-
-
Connection Manager — позволяет подключиться к кластеру через Yandex Connection Manager:
-
Выберите каталог, в котором находится кластер Managed Service for Apache Kafka®.
-
Выберите тип инсталляции Кластер управляемой БД и задайте настройки:
- Кластер управляемой БД — выберите кластер, к которому необходимо подключиться.
- Подключение — выберите подключение в Connection Manager или создайте его.
Важно
Чтобы использовать подключение из Connection Manager, у пользователя должны быть права доступа не ниже
connection-manager.userк этому подключению. -
-
-
В блоке Топик выберите способ указания топика:
-
Полное имя топика — введите полное имя топика.
Если вы не хотите разбивать поток событий на независимые очереди по таблицам, включите настройку Сохранять порядок транзакций.
-
Префикс топика — введите префикс топика.
-
-
Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.
Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.
-
В блоке Настройки сериализации выберите тип сериализации. Подробнее см. в разделе Настройки сериализации.
-
При необходимости задайте Дополнительные настройки эндпоинта:
- Укажите конфигурацию топиков в формате Имя конфигурации-Значение конфигурации.
- Выберите тип сжатия.
- Тип эндпоинта —
kafka_target.
-
security_groups— группы безопасности для сетевого трафика.Правила групп безопасности применяются к трансферу. Они позволяют открыть сетевой доступ с ВМ трансфера к кластеру. Подробнее см. в разделе Сеть в Yandex Data Transfer.
Группы безопасности должны принадлежать той же сети, в которой размещен кластер.
Примечание
В Terraform сеть для групп безопасности задавать не нужно.
-
connection.cluster_id— идентификатор кластера, к которому необходимо подключиться. -
auth— метод аутентификации. Используется при подключении к хостам-брокерам:sasl— аутентификация с помощью SASL.no_auth— без аутентификации.
Пример структуры конфигурационного файла:
resource "yandex_datatransfer_endpoint" "<имя_эндпоинта_в_Terraform>" {
name = "<имя_эндпоинта>"
settings {
kafka_target {
security_groups = ["<список_идентификаторов_групп_безопасности>"]
connection {
cluster_id = "<идентификатор_кластера>"
}
auth {
<метод_аутентификации>
}
<настройки_топика>
<настройки_сериализации>
}
}
}
Подробнее см. в документации провайдера Terraform.
-
securityGroups— группы безопасности для сетевого трафика, правила которых применятся к ВМ и кластерам без изменения их настроек. Подробнее см. в разделе Сеть в Yandex Data Transfer. -
connection.clusterId— идентификатор кластера, к которому необходимо подключиться. -
auth— метод аутентификации. Используется при подключении к хостам-брокерам:-
sasl— аутентификация с помощью SASL. Необходимо указать следующие параметры:user— имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.password.raw— пароль учетной записи (в текстовом виде).mechanism— механизм хеширования.
-
noAuth— без аутентификации.
-
Пользовательская инсталляция
Подключение к кластеру Apache Kafka® с явным указанием сетевых адресов и портов хостов-брокеров.
-
Тип подключения — выберите вариант подключения к базе данных:
-
Ручная настройка — позволяет задать настройки подключения вручную.
Выберите тип инсталляции Пользовательская инсталляция и задайте настройки:
-
URL-адреса брокеров — укажите IP-адреса или FQDN хостов-брокеров.
Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:
<IP-адрес_или_FQDN_хоста-брокера>:<номер_порта> -
SSL — использовать шифрование для защиты соединения.
-
PEM-сертификат — если требуется шифрование передаваемых данных, например для соответствия требованиям PCI DSS, загрузите файл сертификата или добавьте его содержимое в текстовом виде.
Важно
Если не добавить сертификат, трансфер может завершиться ошибкой.
-
Сетевой интерфейс для эндпоинта — выберите или создайте подсеть в нужной зоне доступности. Трансфер будет использовать эту подсеть для доступа к приемнику.
Если значение в этом поле задано для обоих эндпоинтов, то обе подсети должны быть размещены в одной зоне доступности.
-
Аутентификация — выберите тип:
SASLилиБез аутентификации.При выборе значения
SASL:- Пользователь — укажите имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
- Пароль — укажите пароль учетной записи.
- Механизм — выберите механизм хеширования: SHA 256 или SHA 512.
-
-
Connection Manager — позволяет подключиться к базе данных через Yandex Connection Manager:
-
Выберите каталог, в котором создано подключение Connection Manager.
-
Выберите тип инсталляции Пользовательская инсталляция и задайте настройки:
-
Подключение — выберите подключение в Connection Manager или создайте его.
-
Идентификатор подсети — выберите или создайте подсеть в нужной зоне доступности. Трансфер будет использовать эту подсеть для доступа к базе данных.
Если значение в этом поле задано для обоих эндпоинтов, то обе подсети должны быть размещены в одной зоне доступности.
-
-
-
-
В блоке Топик выберите способ указания топика:
-
Полное имя топика — введите полное имя топика.
Если вы не хотите разбивать поток событий на независимые очереди по таблицам, включите настройку Сохранять порядок транзакций.
-
Префикс топика — введите префикс топика.
-
-
Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.
Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.
-
В блоке Настройки сериализации выберите тип сериализации. Подробнее см. в разделе Настройки сериализации.
-
При необходимости задайте Дополнительные настройки эндпоинта:
- Укажите конфигурацию топиков в формате Имя конфигурации-Значение конфигурации.
- Выберите тип сжатия.
- Тип эндпоинта —
kafka_target.
-
security_groups— группы безопасности для сетевого трафика.Правила групп безопасности применяются к трансферу. Они позволяют открыть сетевой доступ с ВМ трансфера к хостам-брокерам. Подробнее см. в разделе Сеть в Yandex Data Transfer.
Группы безопасности должны принадлежать той же сети, что и подсеть
subnet_id, если она указана.Примечание
В Terraform сеть для групп безопасности задавать не нужно.
-
connection.on_premise— параметры подключения к хостам-брокерам:-
broker_urls— IP-адреса или FQDN хостов-брокеров.Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:
<IP-адрес_или_FQDN_хоста-брокера>:<номер_порта> -
tls_mode— параметры шифрования передаваемых данных, если оно требуется, например для соответствия требованиям PCI DSS.disabled— отключено.enabled— включено.-
ca_certificate— сертификат CA.Важно
Если не добавить сертификат, трансфер может завершиться ошибкой.
-
-
subnet_id— идентификатор подсети, в которой находятся хосты-брокеры. Трансфер будет использовать эту подсеть для доступа к ним.
-
-
auth— метод аутентификации. Используется при подключении к хостам-брокерам:sasl— аутентификация с помощью SASL.no_auth— без аутентификации.
Пример структуры конфигурационного файла:
resource "yandex_datatransfer_endpoint" "<имя_эндпоинта_в_Terraform>" {
name = "<имя_эндпоинта>"
settings {
kafka_target {
security_groups = ["<список_идентификаторов_групп_безопасности>"]
connection {
on_premise {
broker_urls = ["<список_IP-адресов_или_FQDN_хостов-брокеров>"]
subnet_id = "<идентификатор_подсети_c_хостами-брокерами>"
}
}
auth = {
<метод_аутентификации>
}
<настройки_топика>
<настройки_сериализации>
}
}
}
Подробнее см. в документации провайдера Terraform.
-
securityGroups— группы безопасности для сетевого трафика, правила которых применятся к ВМ и кластерам без изменения их настроек. Подробнее см. в разделе Сеть в Yandex Data Transfer. -
connection.onPremise— параметры подключения к хостам-брокерам:-
brokerUrls— IP-адреса или FQDN хостов-брокеров.Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:
<IP-адрес_или_FQDN_хоста-брокера>:<номер_порта> -
tlsMode— параметры шифрования передаваемых данных, если оно требуется, например для соответствия требованиям PCI DSS.disabled— отключено.enabled— включено.-
caCertificate— сертификат CA.Важно
Если не добавить сертификат, трансфер может завершиться ошибкой.
-
-
subnetId— идентификатор подсети, в которой находятся хосты-брокеры. Трансфер будет использовать эту подсеть для доступа к ним.
-
-
auth— метод аутентификации. Используется при подключении к хостам-брокерам:-
sasl— аутентификация с помощью SASL. Необходимо указать следующие параметры:user— имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.password.raw— пароль учетной записи (в текстовом виде).mechanism— механизм хеширования.
-
noAuth— без аутентификации.
-
Настройки топика Apache Kafka®
-
Топик:
-
Полное имя топика — укажите имя топика, в который будут отправляться сообщения. Выберите Сохранять порядок транзакций, чтобы не разбивать поток событий на независимые очереди по таблицам.
-
Префикс топика — укажите префикс топика, аналог настройки
Debezium database.server.name. Сообщения будут отправляться в топик с именем<префикс_топика>.<схема>.<имя_таблицы>.
-
Укажите в блоке topic_settings одну из опций отправки сообщений в топик:
-
topic— укажите параметры в этом блоке, чтобы отправлять все сообщения в один топик:topic_name— имя топика, в который будут отправляться сообщения.save_tx_order— опция, позволяющая сохранять порядок транзакций. Укажите значениеtrue, чтобы не разбивать поток событий на независимые очереди по таблицам.
-
topic_prefix— укажите префикс, чтобы отправлять сообщения в разные топики с заданным префиксом.Это аналог настройки Debezium
database.server.name. Сообщения будут отправляться в топик с именем<префикс_топика>.<схема>.<имя_таблицы>.
Укажите в поле topicSettings одну из опций отправки сообщений в топик:
-
topic— укажите параметры в этом поле, чтобы отправлять все сообщения в один топик:topicName— имя топика, в который будут отправляться сообщения.saveTxOrder— опция, позволяющая сохранять порядок транзакций. Укажите значениеtrue, чтобы не разбивать поток событий на независимые очереди по таблицам.
-
topicPrefix— укажите префикс, чтобы отправлять сообщения в разные топики с заданным префиксом.Это аналог настройки Debezium
database.server.name. Сообщения будут отправляться в топик с именем<префикс_топика>.<схема>.<имя_таблицы>.
Yandex Data Transfer поддерживает CDC-режим для трансферов из баз данных PostgreSQL, MySQL® в Apache Kafka®. При этом данные в приемник попадают в формате Debezium. Подробнее о CDC-режиме см. в разделе Захват изменения данных.
Настройки сериализации
-
В блоке Настройки сериализации выберите тип сериализации:
-
Auto — автоматическая сериализация.
-
Debezium — сериализация по стандартам Debezium:
- Выберите схему ключа сообщения (соответствует Debezium-параметру
key.converter). - Выберите схему значения сообщения (соответствует Debezium-параметру
value.converter). - При необходимости задайте Настройки сериализации Debezium в формате
Параметр-Значение.
- Выберите схему ключа сообщения (соответствует Debezium-параметру
-
Если вы хотите использовать JSON-схемы в Yandex Schema Registry, сохраняя совместимость схем при добавлении и удалении опциональных полей, укажите следующие настройки:
-
Настройки сериализации — Debezium.
-
Чтобы использовать Schema Registry для ключей, выберите Схема ключа сообщения — JSON (посредством Schema Registry). Чтобы использовать Schema Registry для значений, выберите Схема значения сообщения — JSON (посредством Schema Registry).
-
URL — эндпоинт пространства имен Schema Registry. Вы можете скопировать эндпоинт из подсказки по подключению к пространству имен Schema Registry на вкладке Debezium, в параметре
value.converter.schema.registry.url.Важно
В пространстве имен должна быть выставлена Политика проверок совместимости для JSON —
optional friendly. -
Имя пользователя —
api-key. -
Пароль — значение API-ключа с ограниченной областью действия для подключения к Schema Registry. Чтобы получить значение:
-
Создайте API-ключ с ограниченной областью действия и поместите его в локальную переменную
SECRET:yc iam api-key create --folder-id <идентификатор_каталога> \ --service-account-name <имя_сервисного_аккаунта_для_работы_со_Schema_Registry> \ --scopes yc.schema-registry.schemas.manage \ --expires-at '2030-01-01T00:00:00Z' >./api-key.yaml && \ SECRET=`cat ./api-key.yaml | grep 'secret:' | awk '{print $2}'` -
Выведите в терминал значение переменной
SECRET:echo $SECRET -
Скопируйте полученное значение и вставьте его в поле Пароль в окне создания эндпоинта.
-
-
-
В блоке Настройки сериализации Debezium:
- Чтобы генерировать закрытую схему для ключей, добавьте параметр
key.converter.dt.json.generate.closed.content.schemaсо значениемtrue. - Чтобы генерировать закрытую схему для значений, добавьте параметр
value.converter.dt.json.generate.closed.content.schemaсо значениемtrue.
- Чтобы генерировать закрытую схему для ключей, добавьте параметр
Укажите в блоке serializer выбранный тип сериализации:
-
serializer_auto. -
serializer_json. -
serializer_debezium.Для этого типа можно указать параметры сериализации Debezium, задав их в блоке
serializer_debezium.serializer_parametersв виде парkey/value.
Если вы хотите использовать JSON-схемы в Yandex Schema Registry, сохраняя совместимость схем при добавлении и удалении опциональных полей, добавьте в конфигурационный файл блок serializer с описанием настроек сериализации. Чтобы генерировать закрытую схему для ключей, добавьте в блок serializer переменную key.converter.dt.json.generate.closed.content.schema со значением true. Чтобы генерировать закрытую схему для значений, добавьте в блок serializer переменную value.converter.dt.json.generate.closed.content.schema со значением true.
resource "yandex_datatransfer_endpoint" "<имя_эндпоинта_в_Terraform>" {
...
settings {
kafka_target {
...
serializer {
serializer_debezium {
serializer_parameters {
key = "key.converter.dt.json.generate.closed.content.schema"
value = "true"
}
serializer_parameters {
key = "value.converter.dt.json.generate.closed.content.schema"
value = "true"
}
serializer_parameters {
key = "value.converter.schemas.enable"
value = "true"
}
serializer_parameters {
key = "value.converter.schema.registry.url"
value = "<URL_пространства_имен_Schema_Registry>"
}
serializer_parameters {
key = "value.converter.basic.auth.user.info"
value = "api-key:<значение_API-ключа>"
}
}
}
}
}
}
Где:
-
URL_пространства_имен_Schema_Registry— эндпоинт пространства имен Schema Registry. Вы можете скопировать эндпоинт из подсказки по подключению к пространству имен Schema Registry на вкладке Debezium, в параметреvalue.converter.schema.registry.url.Важно
В пространстве имен должна быть выставлена Политика проверок совместимости для JSON —
optional friendly. -
значение_API-ключа— значение API-ключа с ограниченной областью действия для подключения к Schema Registry. Чтобы получить значение:-
Создайте API-ключ с ограниченной областью действия и поместите его в локальную переменную
SECRET:yc iam api-key create --folder-id <идентификатор_каталога> \ --service-account-name <имя_сервисного_аккаунта_для_работы_со_Schema_Registry> \ --scopes yc.schema-registry.schemas.manage \ --expires-at '2030-01-01T00:00:00Z' >./api-key.yaml && \ SECRET=`cat ./api-key.yaml | grep 'secret:' | awk '{print $2}'` -
Выведите в терминал значение переменной
SECRET:echo $SECRET -
Скопируйте полученное значение и вставьте его в параметр
valueв конфигурационном файле.
-
Укажите в блоке serializer выбранный тип сериализации:
-
serializerAuto. -
serializerJson. -
serializerDebezium.Для этого типа можно указать параметры сериализации Debezium, задав их в поле
serializerDebezium.serializerParametersв виде парkey/value.
Если вы хотите использовать JSON-схемы в Yandex Schema Registry, сохраняя совместимость схем при добавлении и удалении опциональных полей, добавьте в тело запроса объект serializer с описанием настроек сериализации. Чтобы генерировать закрытую схему для ключей, добавьте в объект serializer переменную key.converter.dt.json.generate.closed.content.schema со значением true. Чтобы генерировать закрытую схему для значений, добавьте в объект serializer переменную value.converter.dt.json.generate.closed.content.schema со значением true.
"serializer": {
"serializerDebezium": {
"serializerParameters": [
{
"key": "converter.dt.json.generate.closed.content.schema",
"value": "true"
},
{
"key": "value.converter",
"value": "io.confluent.connect.json.JsonSchemaConverter"
},
{
"key": "value.converter.schemas.enable",
"value": "true"
},
{
"key": "value.converter.schema.registry.url",
"value": "<URL_пространства_имен_Schema_Registry>"
},
{
"key": "value.converter.basic.auth.user.info",
"value": "api-key:<значение_API-ключа>"
}
]
}
}
Где:
-
URL_пространства_имен_Schema_Registry— эндпоинт пространства имен Schema Registry. Вы можете скопировать эндпоинт из подсказки по подключению к пространству имен Schema Registry на вкладке Debezium, в параметреvalue.converter.schema.registry.url.Важно
В пространстве имен должна быть выставлена Политика проверок совместимости для JSON —
optional friendly. -
значение_API-ключа— значение API-ключа с ограниченной областью действия для подключения к Schema Registry. Чтобы получить значение:-
Создайте API-ключ с ограниченной областью действия и поместите его в локальную переменную
SECRET:yc iam api-key create --folder-id <идентификатор_каталога> \ --service-account-name <имя_сервисного_аккаунта_для_работы_со_Schema_Registry> \ --scopes yc.schema-registry.schemas.manage \ --expires-at '2030-01-01T00:00:00Z' >./api-key.yaml && \ SECRET=`cat ./api-key.yaml | grep 'secret:' | awk '{print $2}'` -
Выведите в терминал значение переменной
SECRET:echo $SECRET -
Скопируйте полученное значение и вставьте его в параметр
valueв конфигурационном файле.
-
Дополнительные настройки
Вы можете указать параметры конфигурации топика
Укажите параметр и одно из его допустимых значений: например, cleanup.policy и compact.
После настройки источника и приемника данных создайте и запустите трансфер.