Transferring data to an OpenSearch target endpoint
Yandex Data Transfer enables you to migrate data to an OpenSearch database and implement various data transfer, processing, and transformation scenarios. To implement a transfer:
- Explore possible data transfer scenarios.
- Configure one of the supported data sources.
- Prepare the OpenSearch database for the transfer.
- Configure the target endpoint in Yandex Data Transfer.
- Create a transfer and start it.
- Perform required operations with the database and control the transfer.
- In case of any issues, use ready-made solutions to resolve them.
Scenarios for transferring data to OpenSearch
-
Data delivery is a process of delivering arbitrary data to target storage. It includes data retrieval from a queue and its deserialization with subsequent transformation to target storage format.
-
Migration: Moving data from one storage to another. Migration often means migrating a database from obsolete local databases to managed cloud ones.
For a detailed description of possible Yandex Data Transfer data transfer scenarios, see Tutorials.
Configuring the data source
Configure one of the supported data sources:
For a complete list of supported sources and targets in Yandex Data Transfer, see Available transfers.
Note
There is a data type restriction: if the sorurce sends an ip
record (IP address), it will be saved as a text
record in the target.
Preparing the target database
-
Make sure the number of columns in the source does not exceed the maximum number of fields in OpenSearch indexes. The maximum number of fields is provided in the
index.mapping.total_fields.limit
parameter. Its default value is1,000
.Warning
Exceeding the limit will result in the
Limit of total fields [1000] has been exceeded
error and the transfer will be stopped.To increase the parameter value, set up a template
that makes the maximum number of fields in new indexes equal to the specified value.Sample template setup request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT "https://<URL_of_OpenSearch_host_with_DATA_role>:9200/_template/index_defaults" \ --data ' { "index_patterns": "cdc*", "settings": { "index": { "mapping": { "total_fields": { "limit": "2000" } } } } }'
With this template setup, all new indexes with the
cdc*
mask will be able to contain up to2,000
fields.You can also set up templates using the OpenSearch Dashboards interface
.To check the current
index.mapping.total_fields.limit
parameter value, execute the following request:curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request GET 'https://<URL_of_OpenSearch_host_with_DATA_role>:9200/<index name>/_settings/*total_fields.limit?include_defaults=true'
-
By default, when transferring data to a single index, only one host is used. To distribute the load across hosts when transferring large amounts of data, set up a template
to split new indexes into shards in advance.Sample template setup request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<URL_of_OpenSearch_host_with_DATA_role>:9200/_template/index_defaults' \ --data ' { "index_patterns": "cdc*", "settings" : { "index" : { "number_of_shards" : 15, "number_of_replicas" : 1 } } }'
With this template setup, all new indexes with the
cdc*
mask will be split into15
shards.You can also set up templates using the OpenSearch Dashboards interface
. -
To enhance data security and availability, set up a policy that will create a new index if at least one of the following conditions is met (recommended values):
- Index is over 50 GB in size.
- Index is over 30 days old.
You can create and enable a policy using requests. For more information about policies, see the OpenSearch documentation
.Example of a policy creation request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_plugins/_ism/policies/rollover_policy' \ --data ' { "policy": { "description": "Example rollover policy", "default_state": "rollover", "schema_version": 1, "states": [ { "name": "rollover", "actions": [ { "rollover": { "min_index_age": "30d", "min_primary_shard_size": "50gb" } } ], "transitions": [] } ], "ism_template": { "index_patterns": ["log*"], "priority": 100 } } }'
Example of a request to assign an alias to a policy
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_index_template/ism_rollover' \ --data ' { "index_patterns": ["log*"], "template": { "settings": { "plugins.index_state_management.rollover_alias": "log" } } }'
Example of a request to create an index with a policy alias
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/log-000001' \ --data ' { "aliases": { "log": { "is_write_index": true } } }'
Example of a request to check if a policy is attached to an index
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request GET 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_plugins/_ism/explain/log-000001?pretty'
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
.
-
Make sure the number of columns in the source does not exceed the maximum number of fields in OpenSearch indexes. The maximum number of fields is provided in the
index.mapping.total_fields.limit
parameter. Its default value is1,000
.Warning
Exceeding the limit will result in the
Limit of total fields [1000] has been exceeded
error and the transfer will be stopped.To increase the parameter value, set up a template
that makes the maximum number of fields in new indexes equal to the specified value.Sample template setup request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT "https://<URL_of_OpenSearch_host_with_DATA_role>:9200/_template/index_defaults" \ --data ' { "index_patterns": "cdc*", "settings": { "index": { "mapping": { "total_fields": { "limit": "2000" } } } } }'
With this template setup, all new indexes with the
cdc*
mask will be able to contain up to2,000
fields.You can also set up templates using the OpenSearch Dashboards interface
.To check the current
index.mapping.total_fields.limit
parameter value, execute the following request:curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request GET 'https://<URL_of_OpenSearch_host_with_DATA_role>:9200/<index name>/_settings/*total_fields.limit?include_defaults=true'
-
By default, when transferring data to a single index, only one host is used. To distribute the load across hosts when transferring large amounts of data, set up a template
to split new indexes into shards in advance.Sample template setup request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<URL_of_OpenSearch_host_with_DATA_role>:9200/_template/index_defaults' \ --data ' { "index_patterns": "cdc*", "settings" : { "index" : { "number_of_shards" : 15, "number_of_replicas" : 1 } } }'
With this template setup, all new indexes with the
cdc*
mask will be split into15
shards.You can also set up templates using the OpenSearch Dashboards interface
. -
To enhance data security and availability, set up a policy that will create a new index if at least one of the following conditions is met (recommended values):
- Index is over 50 GB in size.
- Index is over 30 days old.
You can create and enable a policy using requests. For more information about policies, see the OpenSearch documentation
.Example of a policy creation request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_plugins/_ism/policies/rollover_policy' \ --data ' { "policy": { "description": "Example rollover policy", "default_state": "rollover", "schema_version": 1, "states": [ { "name": "rollover", "actions": [ { "rollover": { "min_index_age": "30d", "min_primary_shard_size": "50gb" } } ], "transitions": [] } ], "ism_template": { "index_patterns": ["log*"], "priority": 100 } } }'
Example of a request to assign an alias to a policy
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_index_template/ism_rollover' \ --data ' { "index_patterns": ["log*"], "template": { "settings": { "plugins.index_state_management.rollover_alias": "log" } } }'
Example of a request to create an index with a policy alias
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/log-000001' \ --data ' { "aliases": { "log": { "is_write_index": true } } }'
Example of a request to check if a policy is attached to an index
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request GET 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_plugins/_ism/explain/log-000001?pretty'
Configuring the OpenSearch target endpoint
When creating or updating an endpoint, you can define:
- Yandex Managed Service for OpenSearch cluster connection or custom installation settings, including those based on Yandex Compute Cloud VMs. These are required parameters.
- Additional parameters.
Managed Service for OpenSearch cluster
Warning
To create or edit an endpoint of a managed database, you need to have the managed-opensearch.viewer
role or the viewer
primitive role assigned for the folder where this managed database cluster resides.
Connection with the cluster ID specified in Yandex Cloud.
-
Managed Service for OpenSearch cluster: Specify ID of the cluster to connect to.
-
User: Specify the username Data Transfer will use to connect to the cluster.
-
Password: Enter the user password to the cluster.
-
Security groups: Select the cloud network to host the endpoint and security groups for network traffic.
This will let you apply the specified security group rules to the VMs and clusters in the selected network without changing the settings of these VMs and clusters. For more information, see Networking in Yandex Data Transfer.
Custom installation
Connecting to nodes with explicitly specified network addresses and ports.
-
Data nodes: Click
to add a new data node. For each node, specify:- Host: IP address or FQDN of the host with the
DATA
role you want to connect to. - Port: Port number Data Transfer will use for connections to the
DATA
host.
- Host: IP address or FQDN of the host with the
-
SSL: Select this option if a secure SSL connection is used.
-
CA certificate: Upload the certificate file or add its contents as text if transmitted data must be encrypted, for example, to meet PCI DSS
requirements. -
Subnet ID: Select or create a subnet in the desired availability zone.
If the value in this field is specified for both endpoints, both subnets must be hosted in the same availability zone.
-
User: Specify the username Data Transfer will use to connect to the cluster.
-
Password: Enter the user password to the cluster.
-
Security groups: Select the cloud network to host the endpoint and security groups for network traffic.
Thus, you will be able to apply the specified security group rules to the VMs and clusters in the selected network without changing the settings of these VMs and clusters. For more information, see Networking in Yandex Data Transfer.
Additional settings
-
Cleanup policy: Select a way to clean up data in the target database before the transfer:
-
Don't cleanup
: Select this option if you are only going to do replication without copying data. -
Drop
: Completely delete the tables included in the transfer (default).Use this option to always transfer the latest version of the table schema to the target database from the source whenever the transfer is activated.
-
-
Sanitize documents keys: Use this option to automatically replace keys that are not valid for OpenSearch in the target fields.
The autocorrect rules are as follows:
- Empty keys or keys consisting of spaces and periods will be replaced with underscores:
""
," "
,"."
→"_"
. - Leading and trailing periods will be removed:
"somekey."
,".somekey"
→"somekey"
. - If there are two periods in a row or there is nothing but spaces between them, the entire fragment will be replaced with a period:
" some . . key"
→" some . key"
.
Here is an example of how the autocorrect works:
". s o m e ..incorrect....key. . . "
→" s o m e .incorrect.key"
. - Empty keys or keys consisting of spaces and periods will be replaced with underscores:
After configuring the data source and target, create and start the transfer.
Troubleshooting data transfer issues
For more troubleshooting tips, see Troubleshooting.
Transfer failure
Error messages:
object field starting or ending with a [.] makes object resolution ambiguous <field_description>
Index -1 out of bounds for length 0
The transfer is aborted because the keys in the documents being transferred are not valid for the OpenSearch target. Invalid keys are empty keys and keys that:
- Consist of spaces.
- Consist of periods.
- Have a period at the beginning or end.
- Have two or more periods in a row.
- Include periods separated by spaces.
Solution:
In the target endpoint additional settings, enable Sanitize documents keys and reactivate the transfer.
Document duplication on the target
When repeatedly transferring data, documents get duplicated on the target.
All documents transferred from the same source table fall under the same index named <schemaName.tableName>
on the target. In this case, the target automatically generates document IDs (_id
) by default. As a result, identical documents are assigned different IDs and get duplicated.
There is no duplication if the primary keys are specified in the source table or endpoint conversion rules. Document IDs are then generated at the transfer stage using the primary key values.
Generation is performed as follows:
- If the key value contains a period (
.
), it is escaped with\
:some.key
-->some\.key
. - All the primary key values are converted into a string:
<some_key1>.<some_key2>.<...>
. - The resulting string is converted by the url.QueryEscape
function. - If the length of the resulting string does not exceed 512 characters, it is used as the
_id
. If it is longer than 512 characters, it is hashed with SHA-1 and the resulting hash is used as the_id
.
As a result, documents with the same primary keys will receive the same ID when the data is transferred again, and the document transferred last will overwrite the existing one.
Solution:
- Set the primary key for one or more columns in the source table or in the endpoint conversion rules.
- Run the transfer.