Передача данных в эндпоинт-приемник OpenSearch
С помощью сервиса Yandex Data Transfer вы можете переносить данные в базу OpenSearch и реализовывать различные сценарии переноса, обработки и трансформации данных. Для реализации трансфера:
- Ознакомьтесь с возможными сценариями передачи данных.
- Настройте один из поддерживаемых источников данных.
- Подготовьте базу данных OpenSearch к трансферу.
- Настройте эндпоинт-приемник в Yandex Data Transfer.
- Создайте и запустите трансфер.
- Выполняйте необходимые действия по работе с базой и контролируйте трансфер.
- При возникновении проблем, воспользуйтесь готовыми решениями по их устранению.
Сценарии передачи данных в OpenSearch
-
Поставка данных — процесс доставки произвольных данных в целевые хранилища. Процесс поставки включает извлечение данных из очереди и их десериализацию с последующей трансформацией данных в формат целевого хранилища.
-
Миграция — перенос данных из одного хранилища в другое. Часто это перенос базы из устаревших локальных баз в управляемые облачные.
Подробное описание возможных сценариев передачи данных в Yandex Data Transfer см. в разделе Практические руководства.
Настройка источника данных
Настройте один из поддерживаемых источников данных:
Полный список поддерживаемых источников и приемников в Yandex Data Transfer см. в разделе Доступные трансферы.
Примечание
Ограничение для типа данных: если из источника передается запись с типом ip
(IP-адрес), то в приемнике эта запись будет сохранена с типом text
.
Подготовка базы данных приемника
-
Убедитесь, что количество колонок в источнике не превышает максимальное количество полей в индексах OpenSearch. Максимальное количество полей задается в параметре
index.mapping.total_fields.limit
и по умолчанию составляет1000
.Важно
Превышение лимита приведет к ошибке
Limit of total fields [1000] has been exceeded
и остановке трансфера.Чтобы увеличить значение параметра, настройте шаблон
, по которому максимальное количество полей в создаваемых индексах будет равно указанному значению.Пример запроса для настройки шаблона
curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request PUT "https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/_template/index_defaults" \ --data ' { "index_patterns": "cdc*", "settings": { "index": { "mapping": { "total_fields": { "limit": "2000" } } } } }'
При такой настройке шаблона все новые индексы с маской
cdc*
смогут содержать до2000
полей.Для настройки шаблона можно также использовать интерфейс OpenSearch Dashboards
.Чтобы проверить текущее значение параметра
index.mapping.total_fields.limit
, выполните запрос:curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request GET 'https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/<название_индекса>/_settings/*total_fields.limit?include_defaults=true'
-
По умолчанию при трансфере данных в единичный индекс задействуется только один хост. Чтобы распределить нагрузку между хостами при передаче больших объемов данных, настройте шаблон
, по которому создаваемые индексы будут заранее разбиты на шарды.Пример запроса для настройки шаблона
curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request PUT 'https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/_template/index_defaults' \ --data ' { "index_patterns": "cdc*", "settings" : { "index" : { "number_of_shards" : 15, "number_of_replicas" : 1 } } }'
При такой настройке шаблона, все новые индексы с маской
cdc*
будут разбиты на15
шардов.Для настройки шаблона можно также использовать интерфейс OpenSearch Dashboards
. -
Чтобы повысить безопасность и доступность данных, установите политику, которая будет создавать новый индекс при выполнении хотя бы одного из условий (рекомендуемые значения):
- Когда размер индекса превысит 50 ГБ.
- Когда возраст индекса превысит 30 дней.
Создать и включить политику можно с помощью запросов. Подробнее о политиках см. в документации OpenSearch
.Пример запроса для создания политики
curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request PUT 'https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/_plugins/_ism/policies/rollover_policy' \ --data ' { "policy": { "description": "Example rollover policy", "default_state": "rollover", "schema_version": 1, "states": [ { "name": "rollover", "actions": [ { "rollover": { "min_index_age": "30d", "min_primary_shard_size": "50gb" } } ], "transitions": [] } ], "ism_template": { "index_patterns": ["log*"], "priority": 100 } } }'
Пример запроса для назначения политике псевдонима
curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request PUT 'https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/_index_template/ism_rollover' \ --data ' { "index_patterns": ["log*"], "template": { "settings": { "plugins.index_state_management.rollover_alias": "log" } } }'
Пример запроса для создания индекса с псевдонимом политики
curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request PUT 'https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/log-000001' \ --data ' { "aliases": { "log": { "is_write_index": true } } }'
Пример запроса для проверки, прикреплена ли политика к индексу
curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request GET 'https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/_plugins/_ism/explain/log-000001?pretty'
-
Убедитесь, что настройки сети, в которой размещен кластер, разрешают подключение к нему из интернета с IP-адресов, используемых сервисом Data Transfer
.
-
Убедитесь, что количество колонок в источнике не превышает максимальное количество полей в индексах OpenSearch. Максимальное количество полей задается в параметре
index.mapping.total_fields.limit
и по умолчанию составляет1000
.Важно
Превышение лимита приведет к ошибке
Limit of total fields [1000] has been exceeded
и остановке трансфера.Чтобы увеличить значение параметра, настройте шаблон
, по которому максимальное количество полей в создаваемых индексах будет равно указанному значению.Пример запроса для настройки шаблона
curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request PUT "https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/_template/index_defaults" \ --data ' { "index_patterns": "cdc*", "settings": { "index": { "mapping": { "total_fields": { "limit": "2000" } } } } }'
При такой настройке шаблона все новые индексы с маской
cdc*
смогут содержать до2000
полей.Для настройки шаблона можно также использовать интерфейс OpenSearch Dashboards
.Чтобы проверить текущее значение параметра
index.mapping.total_fields.limit
, выполните запрос:curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request GET 'https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/<название_индекса>/_settings/*total_fields.limit?include_defaults=true'
-
По умолчанию при трансфере данных в единичный индекс задействуется только один хост. Чтобы распределить нагрузку между хостами при передаче больших объемов данных, настройте шаблон
, по которому создаваемые индексы будут заранее разбиты на шарды.Пример запроса для настройки шаблона
curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request PUT 'https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/_template/index_defaults' \ --data ' { "index_patterns": "cdc*", "settings" : { "index" : { "number_of_shards" : 15, "number_of_replicas" : 1 } } }'
При такой настройке шаблона, все новые индексы с маской
cdc*
будут разбиты на15
шардов.Для настройки шаблона можно также использовать интерфейс OpenSearch Dashboards
. -
Чтобы повысить безопасность и доступность данных, установите политику, которая будет создавать новый индекс при выполнении хотя бы одного из условий (рекомендуемые значения):
- Когда размер индекса превысит 50 ГБ.
- Когда возраст индекса превысит 30 дней.
Создать и включить политику можно с помощью запросов. Подробнее о политиках см. в документации OpenSearch
.Пример запроса для создания политики
curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request PUT 'https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/_plugins/_ism/policies/rollover_policy' \ --data ' { "policy": { "description": "Example rollover policy", "default_state": "rollover", "schema_version": 1, "states": [ { "name": "rollover", "actions": [ { "rollover": { "min_index_age": "30d", "min_primary_shard_size": "50gb" } } ], "transitions": [] } ], "ism_template": { "index_patterns": ["log*"], "priority": 100 } } }'
Пример запроса для назначения политике псевдонима
curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request PUT 'https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/_index_template/ism_rollover' \ --data ' { "index_patterns": ["log*"], "template": { "settings": { "plugins.index_state_management.rollover_alias": "log" } } }'
Пример запроса для создания индекса с псевдонимом политики
curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request PUT 'https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/log-000001' \ --data ' { "aliases": { "log": { "is_write_index": true } } }'
Пример запроса для проверки, прикреплена ли политика к индексу
curl \ --user <имя_пользователя_OpenSearch>:<пароль> \ --header 'Content-Type: application/json' \ --request GET 'https://<адрес_хоста_OpenSearch_с_ролью_DATA>:9200/_plugins/_ism/explain/log-000001?pretty'
Настройка эндпоинта-приемника OpenSearch
При создании или изменении эндпоинта вы можете задать:
- Настройки подключения к кластеру Yandex Managed Service for OpenSearch или пользовательской инсталляции, в т. ч. на базе виртуальных машин Yandex Compute Cloud. Эти параметры обязательные.
- Дополнительные параметры.
Кластер Managed Service for OpenSearch
Важно
Для создания или редактирования эндпоинта управляемой базы данных вам потребуется роль managed-opensearch.viewer
или примитивная роль viewer
, выданная на каталог кластера этой управляемой базы данных.
Подключение с указанием идентификатора кластера в Yandex Cloud.
-
Кластер Managed Service for OpenSearch — укажите идентификатор кластера, к которому необходимо подключиться.
-
Пользователь — укажите имя пользователя, под которым сервис Data Transfer будет подключаться к кластеру.
-
Пароль — укажите пароль пользователя для доступа к кластеру.
-
Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.
Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.
Пользовательская инсталляция
Подключение к узлам с явным указанием сетевых адресов и портов.
-
Узлы с данными — нажмите на значок
, чтобы добавить новый узел с данными. Для каждого узла укажите:- Хост — IP-адрес или FQDN хоста с ролью
DATA
, к которому необходимо подключиться. - Порт — номер порта, который сервис Data Transfer будет использовать для подключения к хосту с ролью
DATA
.
- Хост — IP-адрес или FQDN хоста с ролью
-
SSL — выберите, если используется безопасное соединение SSL.
-
Сертификат CA — загрузите файл сертификата или добавьте его содержимое в текстовом виде, если требуется шифрование передаваемых данных, например, для соответствия требованиям PCI DSS
. -
Идентификатор подсети — выберите или создайте подсеть в нужной зоне доступности.
Если значение в этом поле задано для обоих эндпоинтов, то обе подсети должны быть размещены в одной зоне доступности.
-
Пользователь — укажите имя пользователя, под которым сервис Data Transfer будет подключаться к кластеру.
-
Пароль — укажите пароль пользователя для доступа к кластеру.
-
Группы безопасности — выберите облачную сеть для размещения эндпоинта и группы безопасности для сетевого трафика.
Это позволит применить к ВМ и кластерам в выбранной сети указанные правила групп безопасности без изменения настроек этих ВМ и кластеров. Подробнее см. в разделе Сеть в Yandex Data Transfer.
Дополнительные настройки
-
Политика очистки — выберите способ очистки данных в приемнике перед переносом:
-
Не очищать
— выберите эту опцию, если будет производиться только репликация без копирования данных. -
Drop
— полное удаление таблиц, участвующих в трансфере (вариант по умолчанию).Используйте эту опцию, чтобы при любой активации трансфера в базу-приемник всегда передавалась самая последняя версия схемы таблиц из источника.
-
-
Исправить невалидные ключи в документах — выберите эту опцию, чтобы для полей на приемнике выполнялась автозамена ключей, невалидных для OpenSearch.
Правила автозамены:
- Пустые или состоящие из пробелов и точек ключи будут заменены на подчеркивание:
""
," "
,"."
→"_"
. - Точки в начале и конце ключа будут удалены:
"somekey."
,".somekey"
→"somekey"
. - Если две точки стоят друг за другом, либо между ними есть только пробелы, весь фрагмент будет заменен на точку:
" some . . key"
→" some . key"
.
Пример:
". s o m e ..incorrect....key. . . "
→" s o m e .incorrect.key"
. - Пустые или состоящие из пробелов и точек ключи будут заменены на подчеркивание:
После настройки источника и приемника данных создайте и запустите трансфер.
Решение проблем, возникающих при переносе данных
См. полный список рекомендаций в разделе Решение проблем.
Прерывание трансфера с ошибкой
Тексты ошибок:
object field starting or ending with a [.] makes object resolution ambiguous <описание_поля>
Index -1 out of bounds for length 0
Трансфер прерывается из-за того что ключи в передаваемых документах невалидны для приемника OpenSearch. К невалидным относятся пустые ключи, а также ключи:
- состоящие из пробелов;
- состоящие из точек;
- с точкой в начале или конце;
- с точками, стоящими друг за другом;
- с точками, разделенными пробелами.
Решение:
В дополнительных настройках эндпоинта-приемника включите опцию Исправить невалидные ключи в документах и активируйте трансфер повторно.
Дублирование документов на приемнике
На приемнике возникают дубли документов при повторной передаче данных.
Все документы, передаваемые из одной таблицы источника, попадают в один индекс с именем <schemaName.tableName>
на приемнике. При этом по умолчанию приемник автоматически генерирует идентификаторы документов (_id
). В результате одинаковые документы получают разные идентификаторы и дублируются.
Дублирование не происходит, если в таблице источника или в правилах конвертации эндпоинта заданы первичные ключи. В этом случае идентификаторы документов генерируются на этапе трансфера с использованием значений первичных ключей.
Генерация происходит следующим образом:
- Если значение ключа содержит
.
, она экранируется\
:some.key
-->some\.key
. - Значения всех первичных ключей преобразуются в строку:
<some_key1>.<some_key2>.<...>
. - Полученная строка преобразуется функцией url.QueryEscape
. - Если длина итоговой строки не превышает 512 символов, то она используется в качестве
_id
. Если длина больше 512 символов, то она хешируется алгоритмом SHA-1 , и в качестве_id
используется полученный хеш.
В результате документы с одинаковыми первичными ключами получат одинаковый идентификатор при повторной передаче данных, и последний переданный документ перезапишет существующий.
Решение:
- Установите первичный ключ для одного или нескольких столбцов на источнике или в правилах конвертации эндпоинта.
- Запустите трансфер.