Передача данных из эндпоинта-источника Apache Kafka®
С помощью сервиса Yandex Data Transfer вы можете переносить данные из очереди Apache Kafka® и реализовывать различные сценарии переноса, обработки и трансформации данных. Для реализации трансфера:
- Ознакомьтесь с возможными сценариями передачи данных.
- Подготовьте базу данных Apache Kafka® к трансферу.
- Настройте эндпоинт-источник в Yandex Data Transfer.
- Настройте один из поддерживаемых приемников данных.
- Создайте и запустите трансфер.
- Выполняйте необходимые действия по работе с базой и контролируйте трансфер.
- При возникновении проблем, воспользуйтесь готовыми решениями по их устранению.
Сценарии передачи данных из Apache Kafka®
-
Миграция — перенос данных из одного хранилища в другое. Часто это перенос базы из устаревших локальных баз в управляемые облачные.
Отдельной задачей миграции является зеркалирование данных между очередями:
-
Поставка данных — процесс доставки произвольных данных в целевые хранилища. Процесс поставки включает извлечение данных из очереди и их десериализацию с последующей трансформацией данных в формат целевого хранилища.
Подробное описание возможных сценариев передачи данных в Yandex Data Transfer см. в разделе Практические руководства.
Подготовка базы данных источника
- Создайте пользователя с ролью
ACCESS_ROLE_CONSUMER
на топике-источнике.
-
Убедитесь, что настройки сети, в которой размещен кластер, разрешают подключение к нему из интернета с IP-адресов, используемых сервисом Data Transfer
. -
Настройте права доступа
пользователя к нужному топику. -
Выдайте права
READ
консьюмер-группе, идентификатор которой совпадает с идентификатором трансфера.bin/kafka-acls --bootstrap-server localhost:9092 \ --command-config adminclient-configs.conf \ --add \ --allow-principal User:username \ --operation Read \ --group <идентификатор_трансфера>
-
(Опционально) Чтобы использовать авторизацию по логину и паролю, настройте SASL-аутентификацию
.
Настройка эндпоинта-источника Apache Kafka®
При создании или изменении эндпоинта вы можете задать:
- Настройки подключения к кластеру Yandex Managed Service for Apache Kafka® или пользовательской инсталляции, в т. ч. на базе виртуальных машин Yandex Compute Cloud. Эти параметры обязательные.
- Расширенные настройки.
Кластер Managed Service for Apache Kafka®
Важно
Для создания или редактирования эндпоинта управляемой базы данных вам потребуется роль managed-kafka.viewer
или примитивная роль viewer
, выданная на каталог кластера этой управляемой базы данных.
Подключение с указанием идентификатора кластера в Yandex Cloud.
-
Кластер Managed Service for Apache Kafka — выберите кластер, к которому необходимо подключиться.
-
Аутентификация — выберите тип:
SASL
илиБез аутентификации
.При выборе значения
SASL
:- Имя пользователя — укажите имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
- Пароль — укажите пароль учетной записи.
-
Полное имя топика — укажите имя топика, к которому необходимо подключиться.
-
Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.
Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.
- Тип эндпоинта —
kafka_source
.
-
security_groups
— группы безопасности для сетевого трафика.Правила групп безопасности применяются к трансферу. Они позволяют открыть сетевой доступ с ВМ трансфера к кластеру. Подробнее см. в разделе Сеть в Yandex Data Transfer.
Группы безопасности должны принадлежать той же сети, в которой размещен кластер.
Примечание
В Terraform сеть для групп безопасности задавать не нужно.
-
connection.cluster_id
— идентификатор кластера, к которому необходимо подключиться. -
auth
— метод аутентификации. Используется при подключении к хостам-брокерам:sasl
— аутентификация с помощью SASL.no_auth
— без аутентификации.
-
topic_names
— список имен топиков, к которым необходимо подключиться.
Пример структуры конфигурационного файла:
resource "yandex_datatransfer_endpoint" "<имя_эндпоинта_в_Terraform>" {
name = "<имя_эндпоинта>"
settings {
kafka_source {
security_groups = ["<список_идентификаторов_групп_безопасности>"]
connection {
cluster_id = "<идентификатор_кластера>"
}
auth {
<метод_аутентификации>
}
topic_names = ["<список_имен_топиков>"]
<расширенные_настройки_эндпоинта>
}
}
}
Подробнее см. в документации провайдера Terraform
-
securityGroups
— группы безопасности для сетевого трафика, правила которых применятся к ВМ и кластерам без изменения их настроек. Подробнее см. в разделе Сеть в Yandex Data Transfer. -
connection.clusterId
— идентификатор кластера, к которому необходимо подключиться. -
auth
— метод аутентификации. Используется при подключении к хостам-брокерам:-
sasl
— аутентификация с помощью SASL. Необходимо указать следующие параметры:user
— имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.password.raw
— пароль учетной записи (в текстовом виде).mechanism
— механизм хеширования.
-
noAuth
— без аутентификации.
-
-
topicNames
— список имен топиков, к которым необходимо подключиться.
Пользовательская инсталляция
Подключение к кластеру Apache Kafka® с явным указанием сетевых адресов и портов хостов-брокеров.
-
URL-адреса брокеров — укажите IP-адреса или FQDN хостов-брокеров.
Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:
<IP-адрес_или_FQDN_хоста-брокера>:<номер_порта>
-
SSL — использовать шифрование для защиты соединения.
-
PEM-сертификат — если требуется шифрование передаваемых данных, например для соответствия требованиям PCI DSS
, загрузите файл сертификата или добавьте его содержимое в текстовом виде. -
Сетевой интерфейс для эндпоинта — выберите или создайте подсеть в нужной зоне доступности.
Если значение в этом поле задано для обоих эндпоинтов, то обе подсети должны быть размещены в одной зоне доступности.
-
Аутентификация — выберите тип:
SASL
илиБез аутентификации
.При выборе значения
SASL
:- Пользователь — укажите имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.
- Пароль — укажите пароль учетной записи.
- Механизм — выберите механизм хеширования: SHA 256 или SHA 512.
-
Полное имя топика — укажите имя топика, к которому необходимо подключиться.
-
Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.
Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.
- Тип эндпоинта —
kafka_source
.
-
security_groups
— группы безопасности для сетевого трафика.Правила групп безопасности применяются к трансферу. Они позволяют открыть сетевой доступ с ВМ трансфера к хостам-брокерам. Подробнее см. в разделе Сеть в Yandex Data Transfer.
Группы безопасности должны принадлежать той же сети, что и подсеть
subnet_id
, если она указана.Примечание
В Terraform сеть для групп безопасности задавать не нужно.
-
connection.on_premise
— параметры подключения к хостам-брокерам:-
broker_urls
— IP-адреса или FQDN хостов-брокеров.Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:
<IP-адрес_или_FQDN_хоста-брокера>:<номер_порта>
-
tls_mode
— параметры шифрования передаваемых данных, если оно требуется, например для соответствия требованиям PCI DSS .disabled
— отключено.enabled
— включено.ca_certificate
— сертификат CA.
-
subnet_id
— идентификатор подсети, в которой находятся хосты-брокеры.
-
-
auth
— метод аутентификации. Используется при подключении к хостам-брокерам:sasl
— аутентификация с помощью SASL.no_auth
— без аутентификации.
-
topic_names
— список имен топиков, к которым необходимо подключиться.
Пример структуры конфигурационного файла:
resource "yandex_datatransfer_endpoint" "<имя_эндпоинта_в_Terraform>" {
name = "<имя_эндпоинта>"
settings {
kafka_source {
security_groups = ["<список_идентификаторов_групп_безопасности>"]
connection {
on_premise {
broker_urls = ["<список_IP-адресов_или_FQDN_хостов-брокеров>"]
subnet_id = "<идентификатор_подсети_c_хостами-брокерами>"
}
}
auth = {
<метод_аутентификации>
}
topic_names = ["<список_имен_топиков>"]
<расширенные_настройки_эндпоинта>
}
}
}
Подробнее см. в документации провайдера Terraform
-
securityGroups
— группы безопасности для сетевого трафика, правила которых применятся к ВМ и кластерам без изменения их настроек. Подробнее см. в разделе Сеть в Yandex Data Transfer. -
connection.onPremise
— параметры подключения к хостам-брокерам:-
brokerUrls
— IP-адреса или FQDN хостов-брокеров.Если номер порта Apache Kafka® отличается от стандартного, укажите его через двоеточие после имени хоста:
<IP-адрес_или_FQDN_хоста-брокера>:<номер_порта>
-
tlsMode
— параметры шифрования передаваемых данных, если оно требуется, например для соответствия требованиям PCI DSS .disabled
— отключено.enabled
— включено.caCertificate
— сертификат CA.
-
subnetId
— идентификатор подсети, в которой находятся хосты-брокеры.
-
-
auth
— метод аутентификации. Используется при подключении к хостам-брокерам:-
sasl
— аутентификация с помощью SASL. Необходимо указать следующие параметры:user
— имя учетной записи, от имени которой сервис Data Transfer будет подключаться к топику.password.raw
— пароль учетной записи (в текстовом виде).mechanism
— механизм хеширования.
-
noAuth
— без аутентификации.
-
-
topicNames
— список имен топиков, к которым необходимо подключиться.
Расширенные настройки
В расширенных настройках вы можете задать правила трансформации и правила конвертации. Данные обрабатываются в следующем порядке:
-
Трансформация. Данные в формате JSON передаются функции Yandex Cloud Functions. В теле функции содержатся метаинформация и необработанные данные, которые переданы в очередь. С помощью функции данные обрабатываются и возвращаются в Data Transfer.
-
Конвертация. Выполняется парсинг, с помощью которого данные подготавливаются для передачи приемнику.
Если не заданы правила трансформации, то парсинг применяется к необработанным данным из очереди. Если не заданы правила конвертации, то данные сразу передаются в приемник.
-
Правила трансформации:
-
Функция обработки — выберите одну из функций, созданных в сервисе Cloud Functions.
-
Сервисный аккаунт — выберите или создайте сервисный аккаунт, от имени которого будет запускаться функция обработки.
-
Количество попыток — укажите количество попыток вызова функции обработки.
-
Размер буфера для отправки — укажите размер буфера (в байтах), при заполнении которого данные будут переданы функции обработки.
Максимальный размер буфера — 3,5 МБ. Подробнее об ограничениях, действующих при работе с функциями в сервисе Cloud Functions, читайте в соответствующем разделе.
-
Интервал отправки — укажите длительность интервала (в секундах), по истечении которого данные из потока должны быть переданы функции обработки.
Примечание
Если буфер заполнится или истечет интервал отправки, данные будут переданы функции обработки.
-
Таймаут вызова — укажите допустимое время ожидания ответа от функции обработки (в секундах).
Важно
Значения в полях Интервал отправки и Таймаут вызова указываются с постфиксом
s
, например,10s
. -
-
Правила конвертации:
- Формат данных — выберите один из доступных форматов:
-
JSON
— формат JSON. -
Парсер AuditTrails.v1
— формат логов сервиса Audit Trails. -
Парсер CloudLogging
— формат логов сервиса Cloud Logging. -
Парсер Debezium CDC
— Debezium CDC. Позволяет в настройках указать Confluent Schema Registry .Для формата JSON укажите:
- Схема данных — задайте схему в виде списка полей или загрузите файл с описанием схемы в формате JSON.
Пример задания схемы данных
[ { "name": "request", "type": "string" } ]
- Использовать значение NULL в ключевых столбцах — выберите эту опцию, чтобы разрешить значение
null
в ключевых колонках. - Добавить неразмеченные столбцы — выберите эту опцию, чтобы поля, отсутствующие в схеме, попадали в колонку
_rest
. - Разэкранировать значения строк — выберите эту опцию, чтобы убрать кавычки из строковых переменных (если этого не сделать, значения строковых полей останутся без изменений).
Для Debezium CDC укажите: URL для Schema Registry, способ аутентификации (с указанием логина и пароля пользователя в случае использования аутентификации) и CA-сертификат.
-
- Формат данных — выберите один из доступных форматов:
-
transformer
— правила трансформации. -
parser
— правила конвертации, которые определяются одним из выбранных парсеров:-
audit_trails_v1_parser
— парсер для логов сервиса Audit Trails. -
cloud_logging_parser
— парсер для логов сервиса Cloud Logging. -
json_parser
илиtskv_parser
— парсеры JSON и TSKV, соответственно.Атрибуты, задающие параметры их работы, одинаковы для обоих парсеров:
-
data_schema
— схема данных, представленная либо в виде JSON-объекта с описанием схемы, либо в виде списка полей:fields
— схема в виде JSON-объекта. Объект содержит в себе массив JSON-объектов, которые описывают отдельные колонки.json_fields
— схема в виде списка JSON-полей.
-
null_keys_allowed
— задайте значениеtrue
для этого атрибута, чтобы разрешить значениеnull
в ключевых колонках. -
add_rest_column
— задайте значениеtrue
для этого атрибута, чтобы поля, отсутствующие в схеме, попадали в колонку_rest
. -
unescape_string_values
— задайте значениеtrue
для этого атрибута, чтобы убрать кавычки из строковых переменных (если этого не сделать, значения строковых полей останутся без изменений).
-
-
-
transformer
— правила трансформации:-
cloudFunction
— идентификатор функции, созданной в сервисе Cloud Functions. -
serviceAccountId
— идентификатор сервисного аккаунта, от имени которого будет запускаться функция обработки. -
numberOfRetries
— количество попыток вызова функции обработки. -
bufferSize
— размер буфера (в байтах), при заполнении которого данные будут переданы функции обработки.Максимальный размер буфера — 3,5 МБ. Подробнее об ограничениях, действующих при работе с функциями в сервисе Cloud Functions, читайте в соответствующем разделе.
-
bufferFlushInterval
— длительность интервала (в секундах), по истечении которого данные из потока должны быть переданы функции обработки.Примечание
Если буфер заполнится или истечет интервал отправки, данные будут переданы функции обработки.
-
invocationTimeout
— допустимое время ожидания ответа от функции обработки (в секундах).
Важно
Значения для
bufferFlushInterval
иinvocationTimeout
указываются с постфиксомs
, например,10s
. -
-
parser
— правила конвертации, которые определяются одним из выбранных парсеров:-
auditTrailsV1Parser
— парсер для логов сервиса Audit Trails. -
cloudLoggingParser
— парсер для логов сервиса Cloud Logging. -
jsonParser
илиtskvParser
— парсеры JSON и TSKV, соответственно.Поля, задающие параметры их работы, одинаковы для обоих парсеров:
-
dataSchema
— схема данных, представленная либо в виде JSON-объекта с описанием схемы, либо в виде списка полей:-
fields
— схема в виде JSON-объекта. Объект содержит в себе массив JSON-объектов, которые описывают отдельные колонки. -
jsonFields
— схема в виде списка JSON-полей.
-
-
nullKeysAllowed
— задайте значениеtrue
для этого поля, чтобы разрешить значениеnull
в ключевых колонках. -
addRestColumn
— задайте значениеtrue
для этого поля, чтобы поля, отсутствующие в схеме, попадали в колонку_rest
. -
unescapeStringValues
— задайте значениеtrue
для этого поля, чтобы убрать кавычки из строковых переменных (если этого не сделать, значения строковых полей останутся без изменений).
-
-
Настройка приемника данных
Настройте один из поддерживаемых приемников данных:
- PostgreSQL;
- MySQL®;
- MongoDB
- ClickHouse®;
- Greenplum®;
- Yandex Managed Service for YDB;
- Yandex Object Storage;
- Apache Kafka®;
- YDS;
- Elasticsearch;
- OpenSearch.
Полный список поддерживаемых источников и приемников в Yandex Data Transfer см. в разделе Доступные трансферы.
После настройки источника и приемника данных создайте и запустите трансфер.