Preparing for a transfer
Preparing a source
Airbyte® sources
AWS CloudTrail source
Get an AWS key ID and secret access key by following the AWS guide
For more information, see the Airbyte® documentation
BigQuery source
To prepare a BigQuery data source:
- Create a Google Cloud service account
. - Add the service account
as a participant to the Google Cloud project with theBigQuery User
role. - Create a Google Cloud service account key
.
For more information, see the Airbyte® documentation
Microsoft SQL Server source
Airbyte® has the following requirements for a Microsoft SQL Server data source:
- Make sure your database is accessible from the computer running Airbyte®.
- Create a dedicated read-only Airbyte® user with access to all tables that require replication.
For more information, see the Airbyte® documentation
Airbyte® is already built into Data Transfer, so you do not have to create a separate VM to deploy it and add a user. All you have to do is grant Data Transfer network access to the source database.
S3 source
If you are using a private bucket as a source, grant the read
and list
permissions to the account you will use for connection.
For more information, see the Airbyte® documentation
Apache Kafka® source
- Create a user with the
ACCESS_ROLE_CONSUMER
role for the source topic.
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
. -
Configure user access rights
to the topic you need. -
Grant the
READ
permissions to the consumer group whose ID matches the transfer ID.bin/kafka-acls --bootstrap-server localhost:9092 \ --command-config adminclient-configs.conf \ --add \ --allow-principal User:username \ --operation Read \ --group <transfer_ID>
-
(Optional) To use username and password authorization, configure SASL authentication
.
ClickHouse® source
Note
Yandex Data Transfer cannot transfer a ClickHouse® database if its name contains a hyphen.
If transferring tables with engines other than ReplicatedMergeTree
and Distributed
in a ClickHouse® multi-host cluster, the transfer will fail with the following error: the following tables have not Distributed or Replicated engines and are not yet supported
.
-
Make sure the tables you are transferring use the
MergeTree
family engines. Only these tables and materialized views (MaterializedView) will be transferred.In case of a multi-host cluster, only tables and materialized views with the
ReplicatedMergeTree
orDistributed
engines will be transferred. Make sure these tables and views are present on all the cluster hosts. -
Create a user with access to the source database. In the user settings, specify a value of at least
1000000
for the Max execution time parameter.
-
Make sure the tables you are transferring use the
MergeTree
family engines. Only these tables and materialized views (MaterializedView) will be transferred.In case of a multi-host cluster, only tables and materialized views with the
ReplicatedMergeTree
orDistributed
engines will be transferred. Make sure these tables and views are present on all the cluster hosts. -
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
. -
Create a user with access to the source database. In the user settings, specify a value of at least
1000000
for the Max execution time parameter.
Elasticsearch source
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
Greenplum® source
Note
Data stored in a MATERIALIZED VIEW
is not transferred. To transfer MATERIALIZED VIEW
data, create an ordinary VIEW
that refers to the MATERIALIZED VIEW
to be transferred.
-
Create a user account the transfer will use to connect to the source. To do this, run the following command:
CREATE ROLE <username> LOGIN ENCRYPTED PASSWORD '<password>';
-
Configure the source cluster to enable the user you created to connect to all the cluster master hosts.
-
If you are going to use parallel copy, configure the source cluster to enable the user you created to connect to all the cluster's segment hosts in utility mode. To do this, make sure that the "Access from Data Transfer" setting is enabled for the cluster.
-
Grant the user you created the
SELECT
privilege for the tables to transfer and theUSAGE
privilege for the schemas these tables are in.Privileges must be granted to entire tables. Access to certain table columns only is not supported.
Tables without the required privileges are unavailable to Data Transfer. These tables are processed as if they did not exist.
This example issues privileges to all the tables in the selected schema:
GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <username>; GRANT USAGE ON SCHEMA <schema_name> TO <username>;
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
. -
Create a user account the transfer will use to connect to the source. To do this, run the following command:
CREATE ROLE <username> LOGIN ENCRYPTED PASSWORD '<password>';
-
Configure the source cluster to enable the user you created to connect to all the cluster master hosts.
-
If you are going to use parallel copy, configure the source cluster to enable the user you created to connect to all the cluster segment hosts in utility mode.
-
Grant the user you created the
SELECT
privilege for the tables to transfer and theUSAGE
privilege for the schemas these tables are in.Privileges must be granted to entire tables. Access to certain table columns only is not supported.
Tables without the required privileges are unavailable to Data Transfer. These tables are processed as if they did not exist.
This example grants privileges to all the database tables:
GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <username>; GRANT USAGE ON SCHEMA <schema_name> TO <username>;
Data Transfer works with Greenplum® differently depending on the transfer configuration and the source cluster contents. Detailed information is available in the section on Greenplum® source endpoint settings.
MongoDB source
- Estimate the total number of databases for transfer and the total Managed Service for MongoDB workload. If the workload on the database exceeds 10,000 writes per second, create multiple endpoints and transfers. For more information, see Transferring data from a MongoDB source endpoint.
- Create a user with the
readWrite
role for each source database to be replicated. ThereadWrite
role is required so that a transfer can write data to the__data_transfer.__dt_cluster_time
service collection.
-
Estimate the total number of databases for transfer and the total MongoDB workload. If the workload on the database exceeds 10,000 writes per second, create multiple endpoints and transfers. For more information, see Transferring data from a MongoDB source endpoint.
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
. -
Make sure that the MongoDB version on the target is
4.0
or higher. -
Make sure the MongoDB cluster is configured so that it returns correctly resolving IP addresses or FQDNs (fully qualified domain names) in response to requests.
-
Configure access to the source cluster from Yandex Cloud. To configure a source cluster
for connections from the internet:-
In the configuration file, change the
net.bindIp
setting from127.0.0.1
to0.0.0.0
:# network interfaces net: port: 27017 bindIp: 0.0.0.0
-
Restart the
mongod
service:sudo systemctl restart mongod.service
-
-
If the source cluster does not use replication, enable it:
-
Add replication settings to the
/etc/mongod.conf
configuration file:replication: replSetName: <replica_set_name>
-
Restart the
mongod
service:sudo systemctl restart mongod.service
-
Connect to MongoDB and initialize the replica set with this command:
rs.initiate({ _id: "<replica_set_name>", members: [{ _id: 0, host: "<IP_address_listening_to_MongoDB>:<port>" }] });
-
-
Create a user with the
readWrite
role for all source databases to be replicated:use admin db.createUser({ user: "<username>", pwd: "<password>", mechanisms: ["SCRAM-SHA-1"], roles: [ { db: "<source_database_1_name>", role: "readWrite" }, { db: "<source_database_2_name>", role: "readWrite" }, ... ] });
Once started, the transfer will connect to the source on behalf of this user. The
readWrite
role is required so that a transfer can write data to the__data_transfer.__dt_cluster_time
service collection.Note
For MongoDB 3.6 or higher, it is enough to assign the created user the
read
role for databases to replicate. -
When using MongoDB 3.6 or higher, to run the transfer, the user must have the read permission to the
local.oplog.rs
collection and the read and write permissions to the__data_transfer.__dt_cluster_time
collection. To assign a user theclusterAdmin
role granting these privileges, connect to MongoDB and run the following commands:use admin; db.grantRolesToUser("<username>", ["clusterAdmin"]);
To issue more granular privileges, you can assign the
clusterMonitor
role required for read access to thelocal.oplog.rs
and grant read and write access to the__data_transfer.__dt_cluster_time
system collection.
MySQL® source
-
Enable full binary logging on the source by setting the Binlog row image parameter
toFULL
orNOBLOB
. -
Optionally, set a limit on the size of data chunks to send using the Max allowed packet parameter.
-
Create a user for connecting to the source.
-
For Replication and Snapshot and increment transfers, tables without primary keys are not transferred. To make sure the transfer is running properly when migrating a database with such tables:
-
Do not transfer tables without primary keys. For this purpose, add them to the list of excluded tables in source endpoint settings.
-
Create
PRIMARY KEYS
in those tables to be migrated where there are no keys.-
To get a list of tables without a primary key, run the query:
SELECT tab.table_schema AS database_name, tab.table_name AS table_name, tab.table_rows AS table_rows FROM information_schema.tables tab LEFT JOIN information_schema.table_constraints tco ON (tab.table_schema = tco.table_schema AND tab.table_name = tco.table_name AND tco.constraint_type = 'PRIMARY KEY') WHERE tab.table_schema NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys') AND tco.constraint_type IS NULL AND tab.table_type = 'BASE TABLE';
-
Study the structure of tables without a primary key that need to be transferred to the target:
SHOW CREATE TABLE <database_name>.<table_name>;
-
Add a simple or complex primary key to the tables that need to be transferred to the target:
ALTER TABLE <table_name> ADD PRIMARY KEY (<column_or_column_group>);
-
If the table being transferred to the target has no column or group of columns suitable for the role of the primary key, create a new column:
ALTER TABLE <table_name> ADD id BIGINT PRIMARY KEY AUTO_INCREMENT;
-
Note
If, when creating a primary key, you get an error saying
Creating index 'PRIMARY' required more than 'innodb_online_alter_log_max_size' bytes of modification log. Please try again
, increase theInnodb log file size
parameter value in the DBMS settings. -
-
Deactivate trigger transfer at the transfer initiation stage and reactivate it at the completion stage (for the Replication and the Snapshot and increment transfer types). For more information, see the description of additional endpoint settings for the MySQL® source.
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
. -
Make sure the source uses the MyISAM or InnoDB low-level storage subsystem. If you use other subsystems, the transfer may fail.
-
Enable full binary logging
on the source by setting thebinlog_row_image
parameter toFULL
orNOBLOB
. -
Specify row format for the binary log
on the source by setting thebinlog_format
parameter toROW
. -
If the replication source is a cluster that is behind the load balancer, enable GTID mode for it (
GTID-MODE = ON
).If it is not possible to enable GTID mode for any reason, make sure the binary log name template contains the host name.
In both cases, this will allow replication to continue even after changing the master host.
-
(Optional) Set a limit
on the size of data chunks to send using themax_allowed_packet
parameter. -
Create a user to connect to the source and grant them the required privileges:
CREATE USER '<username>'@'%' IDENTIFIED BY '<password>'; GRANT ALL PRIVILEGES ON <database_name>.* TO '<username>'@'%'; GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO '<username>'@'%';
-
For Replication and Snapshot and increment transfers, tables without primary keys are not transferred. To make sure the transfer is running properly when migrating a database with such tables:
-
Do not transfer tables without primary keys. For this purpose, add them to the list of excluded tables in source endpoint settings.
-
Create
PRIMARY KEYS
in those tables to be migrated where there are no keys.-
To get a list of tables without a primary key, run the query:
SELECT tab.table_schema AS database_name, tab.table_name AS table_name, tab.table_rows AS table_rows FROM information_schema.tables tab LEFT JOIN information_schema.table_constraints tco ON (tab.table_schema = tco.table_schema AND tab.table_name = tco.table_name AND tco.constraint_type = 'PRIMARY KEY') WHERE tab.table_schema NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys') AND tco.constraint_type IS NULL AND tab.table_type = 'BASE TABLE';
-
Study the structure of tables without a primary key that need to be transferred to the target:
SHOW CREATE TABLE <database_name>.<table_name>;
-
Add a simple or complex primary key to the tables that need to be transferred to the target:
ALTER TABLE <table_name> ADD PRIMARY KEY (<column_or_column_group>);
-
If the table being transferred to the target has no column or group of columns suitable for the role of the primary key, create a new column:
ALTER TABLE <table_name> ADD id BIGINT PRIMARY KEY AUTO_INCREMENT;
-
Note
If, when creating a primary key, you get an error saying
Creating index 'PRIMARY' required more than 'innodb_online_alter_log_max_size' bytes of modification log. Please try again
, increase theinno_db_log_file_size
parameter value in the DBMS settings. -
-
Deactivate trigger transfer at the transfer initiation stage and reactivate it at the completion stage (for the Replication and the Snapshot and increment transfer types). For more information, see the description of additional endpoint settings for the MySQL® source.
OpenSearch source
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
Oracle source
Note
Some versions of Oracle use V_$
instead of V$
as the prefix for system objects. For example, V_$DATABASE
instead of V$DATABASE
.
If you get an error like "can only select from fixed tables/views
" when granting permissions to system objects, try changing the prefixes.
-
To prepare the source for the Snapshot transfer:
-
Create a user account the transfer will use to connect to the source:
CREATE USER <username> IDENTIFIED BY <password>; GRANT CREATE SESSION TO <username>;
-
Grant privileges to the created user:
GRANT SELECT ON V$DATABASE TO <username>; GRANT SELECT ON DBA_EXTENTS TO <username>; GRANT SELECT ON DBA_OBJECTS TO <username>; GRANT FLASHBACK ANY TABLE TO <username>;
If required, you can only grant the
FLASHBACK
privileges to the tables you need to copy rather than toANY TABLE
. -
Grant the user the privilege to read the tables
to be copied.
-
-
To prepare the source for the Replication transfer:
-
Create a user account the transfer will use to connect to the source:
CREATE USER <username> IDENTIFIED BY <password>; ALTER USER <username> DEFAULT tablespace USERS TEMPORARY tablespace TEMP; ALTER USER <username> quote unlimited on USERS; GRANT CREATE SESSION, execute_catalog_role, SELECT ANY TRANSACTION, SELECT ANY DISCTIONARY, CREATE PROCEDURE, LOGMINING TO <username>;
-
Grant privileges to the created user:
GRANT SELECT ON V$DATABASE TO <username>; GRANT SELECT ON V$LOG TO <username>; GRANT SELECT ON V$LOGFILE TO <username>; GRANT SELECT ON V$ARCHIVED_LOG TO <username>; GRANT SELECT ON dba_objects TO <username>; GRANT SELECT ON dba_extents TO <username>; GRANT EXECUTE ON SYS.DBMS_LOGMNR TO <username>; GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO <username>; GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO <username>; GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO <username>; GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO <username>;
-
Grant the user the privilege to read the tables
to be replicated. -
Enable Minimal Supplemental Logging
with primary keys as follows:ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
-
-
If you are using the CDB environment
, configure the following settings:-
Create a user
Common User
:CREATE USER C##<username> IDENTIFIED BY <password> CONTAINER=all; ALTER USER C##<username> DEFAULT TABLESPACE USERS temporary tablespace TEMP CONTAINER=all; ALTER USER C##<username> quota unlimited on USERS CONTAINER=all; ALTER USER C##<username> SET container_data = (cdb$root, <your_PCB_name>) CONTAINER=current; GRANT CREATE SESSION, execute_catalog_role, SELECT ANY TRANSACTION, SELECT ANY DICTIONALY, CREATE PROCEDURE, LOGMINING, SET CONTAINER TO C##<username> CONTAINER=ALL;
If required, you can only specify the
cdb$root
container and the container with the tables you need to transfer. -
To allow the user to switch to the
cdb$root
container, grant them theALTER SESSION
privileges:GRANT ALTER SESSION TO C##<username>;
-
Grant privileges to the created user:
GRANT SELECT ON V$DATABASE TO C##<username> CONTAINER=ALL; GRANT SELECT ON V$LOG TO C##<username> CONTAINER=ALL; GRANT SELECT ON V$LOGFILE TO C##<username> CONTAINER=ALL; GRANT SELECT ON V$ARCHIVED_LOG TO C##<username> CONTAINER=ALL; GRANT SELECT ON dba_objects TO C##<username> CONTAINER=ALL; GRANT SELECT ON dba_extents TO C##<username> CONTAINER=ALL; GRANT EXECUTE ON SYS.DBMS_LOGMNR TO C##<username> CONTAINER=ALL; GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO C##<username> CONTAINER=ALL; GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO C##<username> CONTAINER=ALL; GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO C##<username> CONTAINER=ALL; GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO C##<username> CONTAINER=ALL;
-
PostgreSQL source
Note
When performing a transfer from PostgreSQL to a target of any type, objects of the large object
When transferring data of the TIMESTAMP WITHOUT TIME ZONE
type, the time zone set in the timezone
parameter of the PostgreSQL source database applies.
Example
The column with the TIMESTAMP WITHOUT TIME ZONE
data type contains the 1970-01-01 00:00:00
value. The way the value will be transferred depends on the timezone
parameter in the database:
- If the parameter is set to
Etc/UTC
, the time will be transferred as1970-01-01 00:00:00+00
. - If the parameter is set to
Europe/Moscow
, the time will be transferred as1970-01-01 00:00:00+03
.
Data stored in a MATERIALIZED VIEW
is not transferred. To transfer MATERIALIZED VIEW
data, create an ordinary VIEW
that refers to the MATERIALIZED VIEW
to be transferred.
If the definition of the VIEW
to be transferred contains an invocation of the VOLATILE
functionVIEW
with the READ UNCOMMITTED
isolation level. No consistency between the VIEW
data and the data of other objects being transferred is guaranteed. Reading data from a MATERIALIZED VIEW
in the VIEW
definition are equivalent to invoking the VOLATILE
function.
Large objects in the TOAST storage system
-
Configure the user the transfer will use to connect to the source:
-
For the Replication and Snapshot and increment transfer types, assign the
mdb_replication
role to this user. -
Connect to the database you want to migrate as the database owner and configure privileges:
SELECT
for all the database tables to be transferred.SELECT
for all the database sequences to be transferred.USAGE
for the schemas of these tables and sequences.ALL PRIVILEGES
(CREATE
andUSAGE
) to the__consumer_keeper
and__data_transfer_mole_finder
housekeeping table schema defined by the endpoint parameter if the endpoint is going to be used for the Replication or Snapshot and increment transfer types.
-
Configure the number of user connections to the database.
-
If the replication source is a cluster, enable the
pg_tm_aux
extension for it. This will allow replication to continue even after changing the master host. In some cases, a transfer may end in an error after you replace a master in your cluster. For more information, see Troubleshooting. -
To transfer tables without primary keys for the Replication and Snapshot and increment transfer types, you must add the
REPLICA IDENTITY
:- Do not transfer tables without primary keys. For this purpose, add them to the list of excluded tables in source endpoint settings.
- Add the replica ID to tables without
primary keys
:-
For tables with an index, set
REPLICA IDENTITY
byunique key
:ALTER TABLE MY_TBL REPLICA IDENTITY USING INDEX MY_IDX;
-
For tables without an index, change
REPLICA IDENTITY
:ALTER TABLE MY_TBL REPLICA IDENTITY FULL;
In this case, Data Transfer will treat such tables as tables where the primary key is a composite key that includes all columns of the table.
-
If there are no primary keys in a table, logical replication will not include any changes in the rows
(UPDATE
orDELETE
).-
If, while trying to transfer tables from PostgreSQL to PostgreSQL, you do not exclude a table without primary keys on the transfer source, the following error message will be returned:
failed to run (abstract1 source): Cannot parse logical replication message: failed to reload schema: primary key check failed: Tables: n / N check failed: "public"."MY_TBL": no key columns found
-
If, while trying to transfer tables from PostgreSQL to a different database, you add a table without primary keys, the following error message will be returned:
failed to run (abstract1 source): Cannot parse logical replication message: failed to reload schema: primary key check failed: Tables: n / N check failed: "public"."MY_TBL": no key columns found
-
Disable the transfer of external keys when creating a source endpoint. Recreate them once the transfer is completed.
-
Find and terminate DDL queries that are running for too long. To do this, make a selection from the PostgreSQL
pg_stat_activity
housekeeping table:SELECT NOW() - query_start AS duration, query, state FROM pg_stat_activity WHERE state != 'idle' ORDER BY 1 DESC;
This will return a list of queries running on the server. Pay attention to queries with a high
duration
value. -
Deactivate trigger transfer at the transfer initiation stage and reactivate it at the completion stage (for the Replication and the Snapshot and increment transfer types). For more information, see the description of additional endpoint settings for the PostgreSQL source.
-
To enable parallel data reads from the table, set its primary key to serial mode
.Then specify the number of jobs and threads in the transfer parameters under Runtime environment.
-
Configure WAL monitoring.
For Replication and Snapshot and increment transfers, logical replication
is used. To perform it, the transfer creates a replication slot with theslot_name
equal to the transfer ID, which you can get by selecting the transfer from the list of your transfers. Your WAL may grow due to different reasons: a long-running transaction or a transfer issue. Therefore, we recommend you to configure WAL monitoring on the source side.-
To monitor storage or disk space usage, use the monitoring tools to configure an alert (see the
disk.used_bytes
description). -
Set the maximum WAL size for replication in the
Max slot wal keep size
setting. The value of this setting can be edited as of PostgreSQL version 13. To urgently disable a transfer to perform data reads, delete the replication slot.Warning
If set to
-1
(unlimited size), you will not be able to delete WAL files due to open logical replication slots that information is not read from. As a result, the WAL files will take up the entire disk space and you will not be able to connect to the cluster. -
Set up an alert with the Yandex Monitoring tools for the metric used for
Total size of WAL files
. Make sure the threshold values are less than those specified for thedisk.used_bytes
metric because, apart from the data, the disk stores temporary files, the WAL, and other types of data. You can monitor the current slot size by making a DB request with the correctslot_name
equal to the transfer ID:SELECT slot_name, pg_size_pretty(pg_current_wal_lsn() - restart_lsn), active_pid, catalog_xmin, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '<transfer_ID>'
-
Configuration
-
In the
postgresql.conf
file, change the value of thewal_level
parameter tological
:wal_level = logical
-
Restart PostgreSQL.
If the replication source is a cluster, install and enable the pg_tm_aux
To transfer tables without primary keys for the Replication and Snapshot and increment transfer types, you must add the REPLICA IDENTITY
:
- Do not transfer tables without primary keys. For this purpose, add them to the list of excluded tables in source endpoint settings.
- Add the replica ID to tables without
primary keys
:-
For tables with an index, set
REPLICA IDENTITY
byunique key
:ALTER TABLE MY_TBL REPLICA IDENTITY USING INDEX MY_IDX;
-
For tables without an index, change
REPLICA IDENTITY
:ALTER TABLE MY_TBL REPLICA IDENTITY FULL;
In this case, Data Transfer will treat such tables as tables where the primary key is a composite key that includes all columns of the table.
-
If there are no primary keys in a table, logical replication will not include any changes in the rowsUPDATE
or DELETE
).
-
If, while trying to transfer tables from PostgreSQL to PostgreSQL, you do not exclude a table without primary keys on the transfer source, the following error message will be returned:
failed to run (abstract1 source): Cannot parse logical replication message: failed to reload schema: primary key check failed: Tables: n / N check failed: "public"."MY_TBL": no key columns found
-
If, while trying to transfer tables from PostgreSQL to a different database, you add a table without primary keys, the following error message will be returned:
failed to run (abstract1 source): Cannot parse logical replication message: failed to reload schema: primary key check failed: Tables: n / N check failed: "public"."MY_TBL": no key columns found
Disable the transfer of external keys when creating a source endpoint. Recreate them once the transfer is completed.
Find and terminate DDL queries that are running for too long. To do this, make a selection from the PostgreSQL pg_stat_activity
housekeeping table:
SELECT NOW() - query_start AS duration, query, state
FROM pg_stat_activity
WHERE state != 'idle' ORDER BY 1 DESC;
This will return a list of queries running on the server. Pay attention to queries with a high duration
value.
Deactivate trigger transfer at the transfer initiation stage and reactivate it at the completion stage (for the Replication and the Snapshot and increment transfer types). For more information, see the description of additional endpoint settings for the PostgreSQL source.
To enable parallel data reads from the table, set its primary key to serial mode
Then specify the number of workers and threads in the transfer parameters under Runtime environment.
If replication via Patroni
ignore_slots:
- database: <database>
name: <replication_slot>
plugin: wal2json
type: logical
Where:
database
: Name of the database the transfer is configured for.name
: Replication slot name.
The database and the replication slot names must match the values specified in the source endpoint settings. By default, the replication slot name
is the same as the transfer ID
.
Otherwise, the start of the replication phase will fail:
Warn(Termination): unable to create new pg source: Replication slotID <replication_slot_name> does not exist.
Configure WAL monitoring. For Replication and Snapshot and increment transfers, logical replicationslot_name
equal to the transfer ID that you can get by selecting the transfer from the list of your transfers. The WAL size may increase for different reasons: due to a long-running transaction or a transfer issue. Therefore, we recommend setting up WAL monitoring on the source side.
-
Set up alerts following the disk usage recommendations
. -
Set the maximum WAL size
. This feature is available starting with PostgreSQL version 13. -
You can track the current slot size by making a DB request with the correct
slot_name
equal to the transfer ID:SELECT slot_name, pg_size_pretty(pg_current_wal_lsn() - restart_lsn), active_pid, catalog_xmin, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '<transfer_ID>'
Note
For things to note about data transfer from PostgreSQL to ClickHouse® using Replication and Snapshot and increment transfers, see Asynchronously replicating data from PostgreSQL to ClickHouse®.
Yandex Data Streams source
-
Create a service account with the
yds.editor
role. -
(Optional) Create a processing function.
Processing function example
const yc = require("yandex-cloud"); const { Parser } = require("@robojones/nginx-log-parser"); module.exports.handler = async function (event, context) { const schema = '$remote_addr - $remote_user [$time_local] "$request" $status $bytes_sent "$http_referer" "$http_user_agent"'; const parser = new Parser(schema); return { Records: event.Records.map((record) => { const decodedData = new Buffer(record.kinesis.data, "base64") .toString("ascii") .trim(); try { const result = parser.parseLine(decodedData); if (result.request == "") { // empty request - drop message return { eventID: record.eventID, invokeIdentityArn: record.invokeIdentityArn, eventVersion: record.eventVersion, eventName: record.eventName, eventSourceARN: record.eventSourceARN, result: "Dropped" }; } return { // successfully parsed message eventID: record.eventID, invokeIdentityArn: record.invokeIdentityArn, eventVersion: record.eventVersion, eventName: record.eventName, eventSourceARN: record.eventSourceARN, kinesis: { data: new Buffer(JSON.stringify(result)).toString( "base64" ), }, result: "Ok" }; } catch (err) { // error - fail message return { eventID: record.eventID, invokeIdentityArn: record.invokeIdentityArn, eventVersion: record.eventVersion, eventName: record.eventName, eventSourceARN: record.eventSourceARN, result: "ProcessingFailed", }; } }) }; };
-
(Optional) Prepare a data schema file in JSON format.
Sample file with a data schema:
[ { "name": "<field_name>", "type": "<type>" }, ... { "name": "<field_name>", "type": "<type>" } ]
Supported types:
any
boolean
datetime
double
int8
int16
int32
int64
string
uint8
uint16
uint32
uint64
utf8
Yandex Managed Service for YDB source
If you selected Dedicated database mode, create and configure a security group in the network hosting the DB.
Preparing a target
ClickHouse® target
-
If you need to transfer multiple databases, create a separate transfer for each one of them.
-
Create a user with access to the target database.
Once started, the transfer will connect to the target on behalf of this user.
-
If user management via SQL is enabled in the cluster, grant the new user the following permissions:
GRANT CLUSTER ON *.* TO <username> GRANT SELECT, INSERT, ALTER DELETE, CREATE TABLE, CREATE VIEW, DROP TABLE, TRUNCATE, dictGet ON <DB_name>.* TO <username> GRANT SELECT(macro, substitution) ON system.macros TO <username>
If user management via SQL is disabled, permissions are assigned via the management console and CLI.
-
Assign the created security group to the Managed Service for ClickHouse® cluster.
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
. -
Create a target database. Its name must be the same as the source database name. If you need to transfer multiple databases, create a separate transfer for each one of them.
-
Create a user with access to the target database.
Once started, the transfer will connect to the target on behalf of this user.
-
Grant the new user the following permissions:
GRANT CLUSTER ON *.* TO <username> GRANT SELECT, INSERT, ALTER DELETE, CREATE TABLE, CREATE VIEW, DROP TABLE, TRUNCATE, dictGet ON <DB_name>.* TO <username> GRANT SELECT(macro, substitution) ON system.macros TO <username>
Elasticsearch target
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
.
-
Make sure the number of columns in the source does not exceed the maximum number of fields in Elasticsearch indexes. The maximum number of fields is provided in the
index.mapping.total_fields.limit
parameter ; the default value is1000
.To increase the parameter value, set up a template
that makes the maximum number of fields in new indexes equal to the specified value.Sample template setup request
curl \ --user <Elasticsearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT "https://<Elasticsearch_cluster_FQDN>:9200/_template/index_defaults" \ --data ' { "index_patterns": "cdc*", "settings": { "index": { "mapping": { "total_fields": { "limit": "2000" } } } } }'
With this template setup, all new indexes with the
cdc*
mask may contain up to2000
fields.You can also set up templates using the Kibana interface
.To check the current
index.mapping.total_fields.limit
parameter value, use the Kibana interface or execute the following request:curl \ --user <Elasticsearch_username>:<password> \ --header 'Content-Type: application/json' \ --request GET 'https://<Elasticsearch_cluster_FQDN>:9200/<index_name>/_settings/*total_fields.limit?include_defaults=true'
-
By default, when transferring data to a single index, only one host is used. To distribute the load across hosts when transferring large amounts of data, set up a template
to split new indexes into shards in advance.Sample template setup request
curl \ --user <Elasticsearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<Elasticsearch_cluster_FQDN>:9200/_template/index_defaults' \ --data ' { "index_patterns": "cdc*", "settings" : { "index" : { "number_of_shards" : 15, "number_of_replicas" : 1 } } }'
With this template setup, all new indexes with the
cdc*
mask will be split into15
shards.You can also set up templates using the Kibana interface
.
Greenplum® target
-
Disable the following settings on the target:
- Integrity checks for foreign keys
- Triggers
- Other constraints
Warning
Do not reactivate these settings before the transfer is complete. This will ensure data integrity with respect to foreign keys.
-
Create a user:
CREATE ROLE <username> LOGIN ENCRYPTED PASSWORD '<password>';
-
Grant the user all privileges for the database, schemas, and tables to be transferred:
GRANT ALL PRIVILEGES ON DATABASE <database_name> TO <username>;
If the database is not empty, the user must be its owner:
ALTER DATABASE <database_name> OWNER TO <username>;
Once started, the transfer will connect to the target on behalf of this user.
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
. -
Disable the following settings on the target:
- Integrity checks for foreign keys
- Triggers
- Other constraints
Warning
Do not reactivate these settings before the transfer is complete. This will ensure data integrity with respect to foreign keys.
-
Create a user:
CREATE ROLE <username> LOGIN ENCRYPTED PASSWORD '<password>';
-
Grant the user all privileges for the database, schemas, and tables to be transferred:
GRANT ALL PRIVILEGES ON DATABASE <database_name> TO <username>;
If the database is not empty, the user must be its owner:
ALTER DATABASE <database_name> OWNER TO <username>;
Once started, the transfer will connect to the target on behalf of this user.
MongoDB target
-
Create a user with the
readWrite
role for the created database. -
To shard the migrated collections in the Yandex Managed Service for MongoDB target cluster:
-
Use this guide to create and configure empty sharded collections in the target database.
Data Transfer does not automatically shard the migrated collections. Sharding large collections may take a long time and slow down the transfer.
-
If sharding is performed by any key other than the default
_id
, assign the user themdbShardingManager
role. -
When creating a target endpoint, select the
DISABLED
orTRUNCATE
clean policy.Selecting the
DROP
policy will result in the service deleting all the data from the target database, including sharded collections, and replacing them with new unsharded ones when a transfer is activated.
Learn more about sharding in the MongoDB documentation
. -
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
. -
Make sure that the MongoDB version on the target is not lower than that on the source.
-
Make sure the MongoDB cluster is configured so that it returns correctly resolving IP addresses or FQDNs (fully qualified domain names) in response to requests.
-
Configure the target cluster
to allow connections from the internet:-
In the configuration file, change the
net.bindIp
setting from127.0.0.1
to0.0.0.0
:# network interfaces net: port: 27017 bindIp: 0.0.0.0
-
Restart the
mongod
service:sudo systemctl restart mongod.service
-
-
If the target cluster does not use replication, enable it:
-
Add replication settings to the
/etc/mongod.conf
configuration file:replication: replSetName: <replica_set_name>
-
Restart the
mongod
service:sudo systemctl restart mongod.service
-
Connect to MongoDB and initialize the replica set with this command:
rs.initiate({ _id: "<replica_set_name>", members: [{ _id: 0, host: "<IP_address_listening_to_MongoDB>:<port>" }] });
-
-
Connect to the cluster and create a target database:
use <database_name>
-
Create a user with the
readWrite
role for the target database:use admin; db.createUser({ user: "<username>", pwd: "<password>", mechanisms: ["SCRAM-SHA-1"], roles: [ { db: "<target_database_name>", role: "readWrite" } ] });
Once started, the transfer will connect to the target on behalf of this user.
-
To shard the migrated collections in the target cluster:
-
Set up a database and populate it with empty collections with the same names as those in the source.
Data Transfer does not automatically shard the migrated collections. Sharding large collections may take a long time and slow down the transfer.
-
Enable target database sharding:
sh.enableSharding("<target_database_name>")
-
Shard every collection based on its namespace:
sh.shardCollection("<target_database_name>.<collection_name>", { <field_name>: <1|"hashed">, ... });
For more information about the
shardCollection()
function, see the MongoDB documentation . -
To make sure that sharding is set up and enabled, get a list of available shards:
sh.status()
-
If sharding is performed by any key other than the default
_id
, assign theclusterManager
system role to the user Data Transfer will use for connection to the target cluster.use admin; db.grantRolesToUser("<username>", ["clusterManager"]);
-
When creating a target endpoint, select the
DISABLED
orTRUNCATE
clean policy.Selecting the
DROP
policy will result in the service deleting all the data from the target database, including sharded collections, and replacing them with new unsharded ones when a transfer is activated.
Learn more about sharding in the MongoDB documentation
. -
MySQL® target
-
Make sure that the MySQL® major version on the target is not lower than that on the source.
-
Set an SQL Mode matching the source.
-
Create a user for connecting to the source.
- Assign the user the
ALL_PRIVILEGES
role for the source database.
- Assign the user the
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
. -
Make sure that the MySQL® major version on the target is not lower than that on the source.
-
Make sure the target uses the MyISAM or InnoDB low-level storage subsystem.
-
Set an SQL Mode
matching the source. -
Create a user to connect to the target and grant them the required privileges:
CREATE USER '<username>'@'%' IDENTIFIED BY '<password>'; GRANT ALL PRIVILEGES ON <database_name>.* TO '<username>'@'%';
Yandex Object Storage target
- Create a bucket with a configuration you need.
- Create a service account with the
storage.uploader
role.
OpenSearch target
-
Make sure the number of columns in the source does not exceed the maximum number of fields in OpenSearch indexes. The maximum number of fields is provided in the
index.mapping.total_fields.limit
parameter. Its default value is1,000
.Warning
Exceeding the limit will result in the
Limit of total fields [1000] has been exceeded
error and the transfer will be stopped.To increase the parameter value, set up a template
that makes the maximum number of fields in new indexes equal to the specified value.Sample template setup request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT "https://<URL_of_OpenSearch_host_with_DATA_role>:9200/_template/index_defaults" \ --data ' { "index_patterns": "cdc*", "settings": { "index": { "mapping": { "total_fields": { "limit": "2000" } } } } }'
With this template setup, all new indexes with the
cdc*
mask will be able to contain up to2,000
fields.You can also set up templates using the OpenSearch Dashboards interface
.To check the current
index.mapping.total_fields.limit
parameter value, execute the following request:curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request GET 'https://<URL_of_OpenSearch_host_with_DATA_role>:9200/<index name>/_settings/*total_fields.limit?include_defaults=true'
-
By default, when transferring data to a single index, only one host is used. To distribute the load across hosts when transferring large amounts of data, set up a template
to split new indexes into shards in advance.Sample template setup request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<URL_of_OpenSearch_host_with_DATA_role>:9200/_template/index_defaults' \ --data ' { "index_patterns": "cdc*", "settings" : { "index" : { "number_of_shards" : 15, "number_of_replicas" : 1 } } }'
With this template setup, all new indexes with the
cdc*
mask will be split into15
shards.You can also set up templates using the OpenSearch Dashboards interface
. -
To enhance data security and availability, set up a policy that will create a new index if at least one of the following conditions is met (recommended values):
- Index is over 50 GB in size.
- Index is over 30 days old.
You can create and enable a policy using requests. For more information about policies, see the OpenSearch documentation
.Example of a policy creation request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_plugins/_ism/policies/rollover_policy' \ --data ' { "policy": { "description": "Example rollover policy", "default_state": "rollover", "schema_version": 1, "states": [ { "name": "rollover", "actions": [ { "rollover": { "min_index_age": "30d", "min_primary_shard_size": "50gb" } } ], "transitions": [] } ], "ism_template": { "index_patterns": ["log*"], "priority": 100 } } }'
Example of a request to assign an alias to a policy
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_index_template/ism_rollover' \ --data ' { "index_patterns": ["log*"], "template": { "settings": { "plugins.index_state_management.rollover_alias": "log" } } }'
Example of a request to create an index with a policy alias
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/log-000001' \ --data ' { "aliases": { "log": { "is_write_index": true } } }'
Example of a request to check if a policy is attached to an index
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request GET 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_plugins/_ism/explain/log-000001?pretty'
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
.
-
Make sure the number of columns in the source does not exceed the maximum number of fields in OpenSearch indexes. The maximum number of fields is provided in the
index.mapping.total_fields.limit
parameter. Its default value is1,000
.Warning
Exceeding the limit will result in the
Limit of total fields [1000] has been exceeded
error and the transfer will be stopped.To increase the parameter value, set up a template
that makes the maximum number of fields in new indexes equal to the specified value.Sample template setup request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT "https://<URL_of_OpenSearch_host_with_DATA_role>:9200/_template/index_defaults" \ --data ' { "index_patterns": "cdc*", "settings": { "index": { "mapping": { "total_fields": { "limit": "2000" } } } } }'
With this template setup, all new indexes with the
cdc*
mask will be able to contain up to2,000
fields.You can also set up templates using the OpenSearch Dashboards interface
.To check the current
index.mapping.total_fields.limit
parameter value, execute the following request:curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request GET 'https://<URL_of_OpenSearch_host_with_DATA_role>:9200/<index name>/_settings/*total_fields.limit?include_defaults=true'
-
By default, when transferring data to a single index, only one host is used. To distribute the load across hosts when transferring large amounts of data, set up a template
to split new indexes into shards in advance.Sample template setup request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<URL_of_OpenSearch_host_with_DATA_role>:9200/_template/index_defaults' \ --data ' { "index_patterns": "cdc*", "settings" : { "index" : { "number_of_shards" : 15, "number_of_replicas" : 1 } } }'
With this template setup, all new indexes with the
cdc*
mask will be split into15
shards.You can also set up templates using the OpenSearch Dashboards interface
. -
To enhance data security and availability, set up a policy that will create a new index if at least one of the following conditions is met (recommended values):
- Index is over 50 GB in size.
- Index is over 30 days old.
You can create and enable a policy using requests. For more information about policies, see the OpenSearch documentation
.Example of a policy creation request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_plugins/_ism/policies/rollover_policy' \ --data ' { "policy": { "description": "Example rollover policy", "default_state": "rollover", "schema_version": 1, "states": [ { "name": "rollover", "actions": [ { "rollover": { "min_index_age": "30d", "min_primary_shard_size": "50gb" } } ], "transitions": [] } ], "ism_template": { "index_patterns": ["log*"], "priority": 100 } } }'
Example of a request to assign an alias to a policy
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_index_template/ism_rollover' \ --data ' { "index_patterns": ["log*"], "template": { "settings": { "plugins.index_state_management.rollover_alias": "log" } } }'
Example of a request to create an index with a policy alias
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/log-000001' \ --data ' { "aliases": { "log": { "is_write_index": true } } }'
Example of a request to check if a policy is attached to an index
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request GET 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_plugins/_ism/explain/log-000001?pretty'
PostgreSQL target
-
Make sure that the PostgreSQL major version on the target is not lower than that on the source.
-
For transfers from PostgreSQL, enable the same extensions in the target database as in the source database.
If extensions in the source database are installed in a custom schema and are used in the DDLs of the objects you are moving, create DDLs in the target manually. In these DDLs, a function call must not include the schema name. Set the target endpoint cleanup policy to
Truncate
to prevent the transfer from deleting these objects. -
Select the
Drop
cleanup policy for transfer tables.If you have created DDLs in the target manually, use the
Truncate
policy. TheTruncate
policy will not delete these DDLs. -
Create a user with access to the target database.
-
Grant the user all privileges for the database, schemas, and tables to be transferred:
GRANT ALL PRIVILEGES ON DATABASE <database_name> TO <username>;
If the database is not empty, the user must be its owner:
ALTER DATABASE <database_name> OWNER TO <username>;
Once started, the transfer will connect to the target on behalf of this user.
-
If the target has the Save transaction boundaries option enabled, grant the new user all the privileges needed to create the
__data_transfer_lsn
auxiliary table in the current schema (usuallypublic
) in the target:GRANT ALL PRIVILEGES ON SCHEMA <schema_name> TO <username>;
-
Configure the number of user connections to the database.
-
Make sure the settings for the network hosting the cluster allow public connections from IP addresses used by Data Transfer
. -
Make sure that the PostgreSQL major version on the target is not lower than that on the source.
-
In the target database, enable the same extensions that are enabled in the source database.
-
Make sure the target has the
DROP transfer tables
cleanup policy selected. -
Create a user:
CREATE ROLE <username> LOGIN ENCRYPTED PASSWORD '<password>';
-
Grant the user all privileges for the database, schemas, and tables to be transferred:
GRANT ALL PRIVILEGES ON DATABASE <database_name> TO <username>;
If the database is not empty, the user must be its owner:
ALTER DATABASE <database_name> OWNER TO <username>;
Once started, the transfer will connect to the target on behalf of this user.
-
If the target has the Save transaction boundaries option enabled, grant the new user all the privileges needed to create the
__data_transfer_lsn
auxiliary table in the current schema (usuallypublic
) in the target:GRANT ALL PRIVILEGES ON SCHEMA <schema_name> TO <username>;
-
Configure the number of user connections to the database.
Data stored in a MATERIALIZED VIEW
is not transferred. To transfer MATERIALIZED VIEW
data, create an ordinary VIEW
that refers to the MATERIALIZED VIEW
to be transferred.
If the definition of the VIEW
to be transferred contains an invocation of the VOLATILE
functionVIEW
with the READ UNCOMMITTED
isolation level. No consistency between the VIEW
data and the data of other objects being transferred is guaranteed. Reading data from a MATERIALIZED VIEW
in the VIEW
definition are equivalent to invoking the VOLATILE
function.
Yandex Managed Service for YDB target
- Create a service account with the
ydb.editor
role. - For the database running in Dedicated mode, create and configure a security group in the network hosting the DB.
Airbyte® is a registered trademark of Airbyte, Inc in the United States and/or other countries.
Greenplum® and Greenplum Database® are registered trademarks or trademarks of VMware, Inc. in the United States and/or other countries.
ClickHouse® is a registered trademark of ClickHouse, Inc