Передача данных из эндпоинта-источника Apache Kafka®
С помощью сервиса Yandex Data Transfer вы можете переносить данные из очереди Apache Kafka® и реализовывать различные сценарии переноса, обработки и трансформации данных. Для реализации трансфера:
- Ознакомьтесь с возможными сценариями передачи данных.
- Подготовьте базу данных Apache Kafka® к трансферу.
- Настройте эндпоинт-источник в Yandex Data Transfer.
- Настройте один из поддерживаемых приемников данных.
- Создайте и запустите трансфер.
- Выполняйте необходимые действия по работе с базой и контролируйте трансфер.
- При возникновении проблем, воспользуйтесь готовыми решениями по их устранению.
Сценарии передачи данных из Apache Kafka®
-
Миграция — перенос данных из одного хранилища в другое. Часто это перенос базы из устаревших локальных баз в управляемые облачные.
Отдельной задачей миграции является зеркалирование данных между очередями:
-
Поставка данных — процесс доставки произвольных данных в целевые хранилища. Процесс поставки включает извлечение данных из очереди и их десериализацию с последующей трансформацией данных в формат целевого хранилища.
Подробное описание возможных сценариев передачи данных в Yandex Data Transfer см. в разделе Практические руководства.
Подготовка базы данных источника
Создайте пользователя с ролью ACCESS_ROLE_CONSUMER на топике-источнике.
-
Если вы не планируете использовать для подключения к внешнему кластеру сервис Cloud Interconnect или VPN, разрешите подключения к такому кластеру из интернета с IP-адресов, используемых сервисом Data Transfer.
Подробнее о настройке сети для работы с внешними ресурсами см. в концепции.
-
Настройте права доступа
пользователя к нужному топику. -
Выдайте права
READконсьюмер-группе, идентификатор которой совпадает с идентификатором трансфера.bin/kafka-acls --bootstrap-server localhost:9092 \ --command-config adminclient-configs.conf \ --add \ --allow-principal User:username \ --operation Read \ --group <идентификатор_трансфера> -
(Опционально) Чтобы использовать авторизацию по логину и паролю, настройте SASL-аутентификацию
.
Настройка эндпоинта-источника Apache Kafka®
При создании или изменении эндпоинта вы можете задать:
- Настройки подключения к кластеру Yandex Managed Service for Apache Kafka® или пользовательской инсталляции, в т. ч. на базе виртуальных машин Yandex Compute Cloud. Эти параметры обязательные.
- Расширенные настройки.
Кластер Managed Service for Apache Kafka®
Важно
Для создания или редактирования эндпоинта управляемой базы данных вам потребуется роль managed-kafka.viewer или примитивная роль viewer, выданная на каталог кластера этой управляемой базы данных.
Подключение с указанием кластера в Yandex Cloud.
-
Тип подключения — выберите вариант подключения к кластеру:
-
Ручная настройка — позволяет задать настройки подключения вручную.
Выберите тип инсталляции Кластер Managed Service for Apache Kafka и задайте настройки:
-
Кластер Managed Service for Apache Kafka — выберите кластер, к которому необходимо подключиться.
-
Аутентификация — выберите тип:
SASLилиБез аутентификации.При выборе значения
SASL:- Имя пользователя — укажите имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
- Пароль — укажите пароль учетной записи.
-
-
Connection Manager — позволяет подключиться к кластеру через Yandex Connection Manager:
-
Выберите каталог, в котором находится кластер Managed Service for Apache Kafka®.
-
Выберите тип инсталляции Кластер управляемой БД и задайте настройки:
- Кластер управляемой БД — выберите кластер, к которому необходимо подключиться.
- Подключение — выберите подключение в Connection Manager или создайте его.
Важно
Чтобы использовать подключение из Connection Manager, у пользователя должны быть права доступа не ниже
connection-manager.userк этому подключению. -
-
-
Полное имя топика — укажите имя топика, к которому необходимо подключиться.
-
Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.
Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.
- Тип эндпоинта —
kafka_source.
-
security_groups— группы безопасности для сетевого трафика.Правила групп безопасности применяются к трансферу. Они позволяют открыть сетевой доступ с ВМ трансфера к кластеру. Подробнее см. в разделе Сеть в Yandex Data Transfer.
Группы безопасности должны принадлежать той же сети, в которой размещен кластер.
Примечание
В Terraform сеть для групп безопасности задавать не нужно.
-
connection.cluster_id— идентификатор кластера, к которому необходимо подключиться. -
auth— метод аутентификации. Используется при подключении к хостам-брокерам:sasl— аутентификация с помощью SASL.no_auth— без аутентификации.
-
topic_names— список имен топиков, к которым необходимо подключиться.
Пример структуры конфигурационного файла:
resource "yandex_datatransfer_endpoint" "<имя_эндпоинта_в_Terraform>" {
name = "<имя_эндпоинта>"
settings {
kafka_source {
security_groups = ["<список_идентификаторов_групп_безопасности>"]
connection {
cluster_id = "<идентификатор_кластера>"
}
auth {
<метод_аутентификации>
}
topic_names = ["<список_имен_топиков>"]
<расширенные_настройки_эндпоинта>
}
}
}
Подробнее см. в документации провайдера Terraform.
-
securityGroups— группы безопасности для сетевого трафика, правила которых применятся к ВМ и кластерам без изменения их настроек. Подробнее см. в разделе Сеть в Yandex Data Transfer. -
connection.clusterId— идентификатор кластера, к которому необходимо подключиться. -
auth— метод аутентификации. Используется при подключении к хостам-брокерам:-
sasl— аутентификация с помощью SASL. Необходимо указать следующие параметры:user— имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.password.raw— пароль учетной записи (в текстовом виде).mechanism— механизм хеширования.
-
noAuth— без аутентификации.
-
-
topicNames— список имен топиков, к которым необходимо подключиться.
Пользовательская инсталляция
Подключение к кластеру Apache Kafka® с явным указанием сетевых адресов и портов хостов-брокеров.
-
Тип подключения — выберите вариант подключения к базе данных:
-
Ручная настройка — позволяет задать настройки подключения вручную.
Выберите тип инсталляции Пользовательская инсталляция и задайте настройки:
-
URL-адреса брокеров — укажите IP-адреса или FQDN хостов-брокеров.
Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:
<IP-адрес_или_FQDN_хоста-брокера>:<номер_порта> -
SSL — использовать шифрование для защиты соединения.
-
PEM-сертификат — если требуется шифрование передаваемых данных, например для соответствия требованиям PCI DSS, загрузите файл сертификата или добавьте его содержимое в текстовом виде.
Важно
Если не добавить сертификат, трансфер может завершиться ошибкой.
-
Сетевой интерфейс для эндпоинта — выберите или создайте подсеть в нужной зоне доступности. Трансфер будет использовать эту подсеть для доступа к источнику.
Если значение в этом поле задано для обоих эндпоинтов, то обе подсети должны быть размещены в одной зоне доступности.
-
Аутентификация — выберите тип:
SASLилиБез аутентификации.При выборе значения
SASL:- Пользователь — укажите имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
- Пароль — укажите пароль учетной записи.
- Механизм — выберите механизм хеширования: SHA 256 или SHA 512.
-
-
Connection Manager — позволяет подключиться к базе данных через Yandex Connection Manager:
-
Выберите каталог, в котором создано подключение Connection Manager.
-
Выберите тип инсталляции Пользовательская инсталляция и задайте настройки:
-
Подключение — выберите подключение в Connection Manager или создайте его.
-
Идентификатор подсети — выберите или создайте подсеть в нужной зоне доступности. Трансфер будет использовать эту подсеть для доступа к базе данных.
Если значение в этом поле задано для обоих эндпоинтов, то обе подсети должны быть размещены в одной зоне доступности.
-
-
-
-
Полное имя топика — укажите имя топика, к которому необходимо подключиться.
-
Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.
Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.
- Тип эндпоинта —
kafka_source.
-
security_groups— группы безопасности для сетевого трафика.Правила групп безопасности применяются к трансферу. Они позволяют открыть сетевой доступ с ВМ трансфера к хостам-брокерам. Подробнее см. в разделе Сеть в Yandex Data Transfer.
Группы безопасности должны принадлежать той же сети, что и подсеть
subnet_id, если она указана.Примечание
В Terraform сеть для групп безопасности задавать не нужно.
-
connection.on_premise— параметры подключения к хостам-брокерам:-
broker_urls— IP-адреса или FQDN хостов-брокеров.Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:
<IP-адрес_или_FQDN_хоста-брокера>:<номер_порта> -
tls_mode— параметры шифрования передаваемых данных, если оно требуется, например для соответствия требованиям PCI DSS.disabled— отключено.enabled— включено.-
ca_certificate— сертификат CA.Важно
Если не добавить сертификат, трансфер может завершиться ошибкой.
-
-
subnet_id— идентификатор подсети, в которой находятся хосты-брокеры. Трансфер будет использовать эту подсеть для доступа к ним.
-
-
auth— метод аутентификации. Используется при подключении к хостам-брокерам:sasl— аутентификация с помощью SASL.no_auth— без аутентификации.
-
topic_names— список имен топиков, к которым необходимо подключиться.
Пример структуры конфигурационного файла:
resource "yandex_datatransfer_endpoint" "<имя_эндпоинта_в_Terraform>" {
name = "<имя_эндпоинта>"
settings {
kafka_source {
security_groups = ["<список_идентификаторов_групп_безопасности>"]
connection {
on_premise {
broker_urls = ["<список_IP-адресов_или_FQDN_хостов-брокеров>"]
subnet_id = "<идентификатор_подсети_c_хостами-брокерами>"
}
}
auth = {
<метод_аутентификации>
}
topic_names = ["<список_имен_топиков>"]
<расширенные_настройки_эндпоинта>
}
}
}
Подробнее см. в документации провайдера Terraform.
-
securityGroups— группы безопасности для сетевого трафика, правила которых применятся к ВМ и кластерам без изменения их настроек. Подробнее см. в разделе Сеть в Yandex Data Transfer. -
connection.onPremise— параметры подключения к хостам-брокерам:-
brokerUrls— IP-адреса или FQDN хостов-брокеров.Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:
<IP-адрес_или_FQDN_хоста-брокера>:<номер_порта> -
tlsMode— параметры шифрования передаваемых данных, если оно требуется, например для соответствия требованиям PCI DSS.disabled— отключено.enabled— включено.-
caCertificate— сертификат CA.Важно
Если не добавить сертификат, трансфер может завершиться ошибкой.
-
-
subnetId— идентификатор подсети, в которой находятся хосты-брокеры. Трансфер будет использовать эту подсеть для доступа к ним.
-
-
auth— метод аутентификации. Используется при подключении к хостам-брокерам:-
sasl— аутентификация с помощью SASL. Необходимо указать следующие параметры:user— имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.password.raw— пароль учетной записи (в текстовом виде).mechanism— механизм хеширования.
-
noAuth— без аутентификации.
-
-
topicNames— список имен топиков, к которым необходимо подключиться.
Расширенные настройки
В расширенных настройках вы можете задать правила трансформации и правила конвертации. Данные обрабатываются в следующем порядке:
-
Трансформация. Данные в формате JSON передаются функции Yandex Cloud Functions. В теле функции содержатся метаинформация и необработанные данные, которые переданы в очередь. С помощью функции данные обрабатываются и возвращаются в Data Transfer.
-
Конвертация. Выполняется парсинг, с помощью которого данные подготавливаются для передачи приемнику.
Если не заданы правила трансформации, то парсинг применяется к необработанным данным из очереди. Если не заданы правила конвертации, то данные сразу передаются в приемник.
-
Правила трансформации:
-
Функция обработки — выберите одну из функций, созданных в сервисе Cloud Functions.
-
Сервисный аккаунт — выберите или создайте сервисный аккаунт, от имени которого будет запускаться функция обработки.
-
Количество попыток — укажите количество попыток вызова функции обработки.
-
Размер буфера для отправки — укажите размер буфера (в байтах), при заполнении которого данные будут переданы функции обработки.
Максимальный размер буфера — 3,5 МБ. Подробнее об ограничениях, действующих при работе с функциями в сервисе Cloud Functions, читайте в соответствующем разделе.
-
Интервал отправки — укажите длительность интервала (в секундах), по истечении которого данные из потока должны быть переданы функции обработки.
Примечание
Если буфер заполнится или истечет интервал отправки, данные будут переданы функции обработки.
-
Таймаут вызова — укажите допустимое время ожидания ответа от функции обработки (в секундах).
Важно
Значения в полях Интервал отправки и Таймаут вызова указываются с постфиксом
s, например,10s. -
-
Правила конвертации:
- Формат данных — выберите один из доступных форматов:
-
JSON— формат JSON. -
Парсер AuditTrails.v1— формат логов сервиса Audit Trails. -
Парсер CloudLogging— формат логов сервиса Cloud Logging. -
Парсер Debezium CDC— Debezium CDC. Позволяет в настройках указать Confluent Schema Registry .Для формата JSON укажите:
- Схема данных — задайте схему в виде списка полей или загрузите файл с описанием схемы в формате JSON.
Пример задания схемы данных
[ { "name": "request", "type": "string" } ]- Использовать значение NULL в ключевых столбцах — выберите эту опцию, чтобы разрешить значение
nullв ключевых колонках. - Добавить неразмеченные столбцы — выберите эту опцию, чтобы поля, отсутствующие в схеме, попадали в колонку
_rest. - Разэкранировать значения строк — выберите эту опцию, чтобы убрать кавычки из строковых переменных (если этого не сделать, значения строковых полей останутся без изменений).
Для Debezium CDC укажите: URL для Schema Registry, способ аутентификации (с указанием логина и пароля пользователя в случае использования аутентификации) и CA-сертификат.
-
- Формат данных — выберите один из доступных форматов:
-
transformer— правила трансформации. -
parser— правила конвертации, которые определяются одним из выбранных парсеров:-
audit_trails_v1_parser— парсер для логов сервиса Audit Trails. -
cloud_logging_parser— парсер для логов сервиса Cloud Logging. -
json_parserилиtskv_parser— парсеры JSON и TSKV, соответственно.Атрибуты, задающие параметры их работы, одинаковы для обоих парсеров:
-
data_schema— схема данных, представленная либо в виде JSON-объекта с описанием схемы, либо в виде списка полей:fields— схема в виде JSON-объекта. Объект содержит в себе массив JSON-объектов, которые описывают отдельные колонки.json_fields— схема в виде списка JSON-полей.
-
null_keys_allowed— задайте значениеtrueдля этого атрибута, чтобы разрешить значениеnullв ключевых колонках. -
add_rest_column— задайте значениеtrueдля этого атрибута, чтобы поля, отсутствующие в схеме, попадали в колонку_rest. -
unescape_string_values— задайте значениеtrueдля этого атрибута, чтобы убрать кавычки из строковых переменных (если этого не сделать, значения строковых полей останутся без изменений).
-
-
-
transformer— правила трансформации:-
cloudFunction— идентификатор функции, созданной в сервисе Cloud Functions. -
serviceAccountId— идентификатор сервисного аккаунта, от имени которого будет запускаться функция обработки. -
numberOfRetries— количество попыток вызова функции обработки. -
bufferSize— размер буфера (в байтах), при заполнении которого данные будут переданы функции обработки.Максимальный размер буфера — 3,5 МБ. Подробнее об ограничениях, действующих при работе с функциями в сервисе Cloud Functions, читайте в соответствующем разделе.
-
bufferFlushInterval— длительность интервала (в секундах), по истечении которого данные из потока должны быть переданы функции обработки.Примечание
Если буфер заполнится или истечет интервал отправки, данные будут переданы функции обработки.
-
invocationTimeout— допустимое время ожидания ответа от функции обработки (в секундах).
Важно
Значения для
bufferFlushIntervalиinvocationTimeoutуказываются с постфиксомs, например,10s. -
-
parser— правила конвертации, которые определяются одним из выбранных парсеров:-
auditTrailsV1Parser— парсер для логов сервиса Audit Trails. -
cloudLoggingParser— парсер для логов сервиса Cloud Logging. -
jsonParserилиtskvParser— парсеры JSON и TSKV, соответственно.Поля, задающие параметры их работы, одинаковы для обоих парсеров:
-
dataSchema— схема данных, представленная либо в виде JSON-объекта с описанием схемы, либо в виде списка полей:-
fields— схема в виде JSON-объекта. Объект содержит в себе массив JSON-объектов, которые описывают отдельные колонки. -
jsonFields— схема в виде списка JSON-полей.
-
-
nullKeysAllowed— задайте значениеtrueдля этого поля, чтобы разрешить значениеnullв ключевых колонках. -
addRestColumn— задайте значениеtrueдля этого поля, чтобы поля, отсутствующие в схеме, попадали в колонку_rest. -
unescapeStringValues— задайте значениеtrueдля этого поля, чтобы убрать кавычки из строковых переменных (если этого не сделать, значения строковых полей останутся без изменений).
-
-
Настройка приемника данных
Настройте один из поддерживаемых приемников данных:
Полный список поддерживаемых источников и приемников в Yandex Data Transfer см. в разделе Доступные трансферы.
После настройки источника и приемника данных создайте и запустите трансфер.