Preparing for a transfer
Preparing a source
Airbyte® sources
AWS CloudTrail source
Get an AWS key ID and secret access key by following the AWS guide
For more information, see the Airbyte® documentation
BigQuery source
To prepare a BigQuery data source:
- Create a Google Cloud service account
. - Add the service account
as a participant to the Google Cloud project with theBigQuery User
role. - Create a Google Cloud service account key
.
For more information, see the Airbyte® documentation
Microsoft SQL Server source
Airbyte® has the following requirements for a Microsoft SQL Server data source:
- Make sure your database is accessible from the computer running Airbyte®.
- Create a dedicated read-only Airbyte® user with access to all tables that require replication.
For more information, see the Airbyte® documentation
Airbyte® is already built into Data Transfer, so you do not have to create a separate VM to deploy it and add a user. All you have to do is grant Data Transfer network access to the source database.
S3 source
If you are using a private bucket as a source, grant the read
and list
permissions to the account you will use for connection.
For more information, see the Airbyte® documentation
Apache Kafka® source
Create a user with the ACCESS_ROLE_CONSUMER
role for the source topic.
-
If not planning to use Cloud Interconnect or VPN
for connections to an external cluster, make such cluster accessible from the Internet from IP addresses used by Data Transfer .For details on linking your network up with external resources, see this concept.
-
Configure user access rights
to the topic you need. -
Grant the
READ
permissions to the consumer group whose ID matches the transfer ID.bin/kafka-acls --bootstrap-server localhost:9092 \ --command-config adminclient-configs.conf \ --add \ --allow-principal User:username \ --operation Read \ --group <transfer_ID>
-
Optionally, to log in with a username and password, configure SASL authentication
.
ClickHouse® source
Note
Yandex Data Transfer cannot transfer a ClickHouse® database if its name contains a hyphen.
If transferring tables with engines other than ReplicatedMergeTree
and Distributed
in a ClickHouse® multi-host cluster, the transfer will fail with the following error: the following tables have not Distributed or Replicated engines and are not yet supported
.
-
Make sure the tables you are transferring use the
MergeTree
family engines. Only these tables and materialized views (MaterializedView) will be transferred.In case of a multi-host cluster, only tables and materialized views with the
ReplicatedMergeTree
orDistributed
engines will be transferred. Make sure these tables and views are present on all the cluster hosts. -
Create a user with access to the source database. In the user settings, specify a value of at least
1000000
for the Max execution time parameter.
-
Make sure the tables you are transferring use the
MergeTree
family engines. Only these tables and materialized views (MaterializedView) will be transferred.In case of a multi-host cluster, only tables and materialized views with the
ReplicatedMergeTree
orDistributed
engines will be transferred. Make sure these tables and views are present on all the cluster hosts. -
If not planning to use Cloud Interconnect or VPN
for connections to an external cluster, make such cluster accessible from the Internet from IP addresses used by Data Transfer .For details on linking your network up with external resources, see this concept.
-
Create a user with access to the source database. In the user settings, specify a value of at least
1000000
for the Max execution time parameter.
Elasticsearch source
If not planning to use Cloud Interconnect or VPN
For details on linking your network up with external resources, see this concept.
Greenplum® source
Note
Data stored in a MATERIALIZED VIEW
is not transferred. To transfer MATERIALIZED VIEW
data, create an ordinary VIEW
that refers to the MATERIALIZED VIEW
to be transferred.
-
Create a user account the transfer will use to connect to the source. To do this, run the following command:
CREATE ROLE <username> LOGIN ENCRYPTED PASSWORD '<password>';
-
Configure the source cluster to enable the user you created to connect to all the cluster master hosts.
-
If you are going to use parallel copy, configure the source cluster to enable the user you created to connect to all the cluster's segment hosts in utility mode. To do this, make sure that the "Access from Data Transfer" setting is enabled for the cluster.
-
Grant the user you created the
SELECT
privilege for the tables to transfer and theUSAGE
privilege for the schemas these tables are in.Privileges must be granted to entire tables. Access to certain table columns only is not supported.
Tables without the required privileges are unavailable to Data Transfer. These tables are processed as if they did not exist.
This example issues privileges to all the tables in the selected schema:
GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <username>; GRANT USAGE ON SCHEMA <schema_name> TO <username>;
-
If not planning to use Cloud Interconnect or VPN
for connections to an external cluster, make such cluster accessible from the Internet from IP addresses used by Data Transfer .For details on linking your network up with external resources, see this concept.
-
Create a user account the transfer will use to connect to the source. To do this, run the following command:
CREATE ROLE <username> LOGIN ENCRYPTED PASSWORD '<password>';
-
Configure the source cluster to enable the user you created to connect to all the cluster master hosts.
-
If you are going to use parallel copy, configure the source cluster to enable the user you created to connect to all the cluster segment hosts in utility mode.
-
Grant the user you created the
SELECT
privilege for the tables to transfer and theUSAGE
privilege for the schemas these tables are in.Privileges must be granted to entire tables. Access to certain table columns only is not supported.
Tables without the required privileges are unavailable to Data Transfer. These tables are processed as if they did not exist.
This example grants privileges to all the database tables:
GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <username>; GRANT USAGE ON SCHEMA <schema_name> TO <username>;
Data Transfer works with Greenplum® differently depending on the transfer configuration and the source cluster contents. Detailed information is available in the section on Greenplum® source endpoint settings.
MongoDB source
-
Estimate the total number of databases for transfer and the total Managed Service for MongoDB workload. If the workload on the database exceeds 10,000 writes per second, create multiple endpoints and transfers. For more information, see Transferring data from a MongoDB source endpoint.
-
Create a user with the
readWrite
role for each source database to replicate. ThereadWrite
role is required to enable the transfer to write data to the__data_transfer.__dt_cluster_time
service collection.
-
Estimate the total number of databases for transfer and the total MongoDB workload. If the workload on the database exceeds 10,000 writes per second, create multiple endpoints and transfers. For more information, see Transferring data from a MongoDB source endpoint.
-
If not planning to use Cloud Interconnect or VPN
for connections to an external cluster, make such cluster accessible from the Internet from IP addresses used by Data Transfer .For details on linking your network up with external resources, see this concept.
-
Make sure the MongoDB version on the target is
4.0
or higher. -
Make sure the MongoDB cluster is configured so that it returns correctly resolving IP addresses or FQDNs (fully qualified domain names) in response to requests.
-
Configure access to the source cluster from Yandex Cloud. To configure a source cluster
for connections from the internet:-
In the configuration file, change
net.bindIp
from127.0.0.1
to0.0.0.0
:# network interfaces net: port: 27017 bindIp: 0.0.0.0
-
Restart
mongod
:sudo systemctl restart mongod.service
-
-
If the source cluster does not use replication, enable it:
-
Add the replication settings to the
/etc/mongod.conf
configuration file:replication: replSetName: <replica_set_name>
-
Restart
mongod
:sudo systemctl restart mongod.service
-
Connect to MongoDB and initialize the replica set with this command:
rs.initiate({ _id: "<replica_set_name>", members: [{ _id: 0, host: "<IP_address_listened_by_MongoDB>:<port>" }] });
-
-
Create a user with the
readWrite
role for all the source databases to replicate:use admin db.createUser({ user: "<username>", pwd: "<password>", mechanisms: ["SCRAM-SHA-1"], roles: [ { db: "<source_database_1_name>", role: "readWrite" }, { db: "<source_database_2_name>", role: "readWrite" }, ... ] });
Once started, the transfer will connect to the source on behalf of this user. The
readWrite
role is required to enable the transfer to write data to the__data_transfer.__dt_cluster_time
service collection.Note
For MongoDB 3.6 or higher, you only need to assign the created user the
read
role for the databases to replicate. -
When using MongoDB 3.6 or higher, to run the transfer, the user must have the read permission for the
local.oplog.rs
collection and the read and write permissions for the__data_transfer.__dt_cluster_time
collection. To assign a user theclusterAdmin
role granting these permissions, connect to MongoDB and run the following commands:use admin; db.grantRolesToUser("<username>", ["clusterAdmin"]);
To grant more granular permissions, you can assign the
clusterMonitor
role required for reading thelocal.oplog.rs
collection and grant read and write access to the__data_transfer.__dt_cluster_time
system collection.
MySQL® source
-
Enable full binary logging on the source by setting the Binlog row image parameter
toFULL
orNOBLOB
. -
Optionally, set a limit on the size of data chunks to send using the Max allowed packet parameter.
-
Create a user for connecting to the source.
-
For Replication and Snapshot and increment transfers, tables without primary keys are not transferred. To make sure the transfer is running properly when migrating a database with such tables:
-
Do not transfer tables without primary keys. For this purpose, add them to the list of excluded tables in source endpoint settings.
-
Create
PRIMARY KEYS
in those tables to be migrated where there are no keys.-
To get a list of tables without a primary key, run the query:
SELECT tab.table_schema AS database_name, tab.table_name AS table_name, tab.table_rows AS table_rows FROM information_schema.tables tab LEFT JOIN information_schema.table_constraints tco ON (tab.table_schema = tco.table_schema AND tab.table_name = tco.table_name AND tco.constraint_type = 'PRIMARY KEY') WHERE tab.table_schema NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys') AND tco.constraint_type IS NULL AND tab.table_type = 'BASE TABLE';
-
Study the structure of tables without a primary key that need to be transferred to the target:
SHOW CREATE TABLE <database_name>.<table_name>;
-
Add a simple or complex primary key to the tables that need to be transferred to the target:
ALTER TABLE <table_name> ADD PRIMARY KEY (<column_or_column_group>);
-
If the table being transferred to the target has no column or group of columns suitable for the role of the primary key, create a new column:
ALTER TABLE <table_name> ADD id BIGINT PRIMARY KEY AUTO_INCREMENT;
-
Note
If, when creating a primary key, you get an error saying
Creating index 'PRIMARY' required more than 'innodb_online_alter_log_max_size' bytes of modification log. Please try again
, increase theInnodb log file size
parameter value in the DBMS settings. -
-
Deactivate trigger transfer at the transfer initiation stage and reactivate it at the completion stage (for the Replication and the Snapshot and increment transfer types). For more information, see the description of additional endpoint settings for the MySQL® source.
-
If not planning to use Cloud Interconnect or VPN
for connections to an external cluster, make such cluster accessible from the Internet from IP addresses used by Data Transfer .For details on linking your network up with external resources, see this concept.
-
Make sure the source uses the MyISAM or InnoDB low-level storage subsystem. If you use other subsystems, the transfer may fail.
-
Enable full binary logging
on the source by setting thebinlog_row_image
parameter toFULL
orNOBLOB
. -
Specify row format for the binary log
on the source by setting thebinlog_format
parameter toROW
. -
For the Replication and Snapshot and increment transfer types:
-
In the
log_bin
parameter, specify the path to the binary log file . -
Enter the binary log information using the SHOW MASTER STATUS
request (for MySQL® 5.7 and 8.0) or the SHOW BINARY LOG STATUS request (for MySQL® 8.4). The request should return a string with the information, not an empty response.
-
-
If the replication source is a cluster that is behind the load balancer, enable GTID mode for it (
GTID-MODE = ON
).If it is not possible to enable GTID mode for any reason, make sure the binary log name template contains the host name.
In both cases, this will allow replication to continue even after changing the master host.
-
Optionally, set a limit
on the size of outbound data chunks using themax_allowed_packet
parameter. -
Create a user to connect to the source and grant them the required privileges:
CREATE USER '<username>'@'%' IDENTIFIED BY '<password>'; GRANT ALL PRIVILEGES ON <database_name>.* TO '<username>'@'%'; GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO '<username>'@'%';
-
For Replication and Snapshot and increment transfers, tables without primary keys are not transferred. To make sure the transfer is running properly when migrating a database with such tables:
-
Do not transfer tables without primary keys. For this purpose, add them to the list of excluded tables in source endpoint settings.
-
Create
PRIMARY KEYS
in those tables to be migrated where there are no keys.-
To get a list of tables without a primary key, run the query:
SELECT tab.table_schema AS database_name, tab.table_name AS table_name, tab.table_rows AS table_rows FROM information_schema.tables tab LEFT JOIN information_schema.table_constraints tco ON (tab.table_schema = tco.table_schema AND tab.table_name = tco.table_name AND tco.constraint_type = 'PRIMARY KEY') WHERE tab.table_schema NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys') AND tco.constraint_type IS NULL AND tab.table_type = 'BASE TABLE';
-
Study the structure of tables without a primary key that need to be transferred to the target:
SHOW CREATE TABLE <database_name>.<table_name>;
-
Add a simple or complex primary key to the tables that need to be transferred to the target:
ALTER TABLE <table_name> ADD PRIMARY KEY (<column_or_column_group>);
-
If the table being transferred to the target has no column or group of columns suitable for the role of the primary key, create a new column:
ALTER TABLE <table_name> ADD id BIGINT PRIMARY KEY AUTO_INCREMENT;
-
Note
If, when creating a primary key, you get an error saying
Creating index 'PRIMARY' required more than 'innodb_online_alter_log_max_size' bytes of modification log. Please try again
, increase theinno_db_log_file_size
parameter value in the DBMS settings. -
-
Deactivate trigger transfer at the transfer initiation stage and reactivate it at the completion stage (for the Replication and the Snapshot and increment transfer types). For more information, see the description of additional endpoint settings for the MySQL® source.
OpenSearch source
If not planning to use Cloud Interconnect or VPN
For details on linking your network up with external resources, see this concept.
Oracle source
Note
Some versions of Oracle use V_$
instead of V$
as the prefix for system objects. For example, V_$DATABASE
instead of V$DATABASE
.
If you get an error like "can only select from fixed tables/views
" when granting permissions to system objects, try changing the prefixes.
-
To prepare the source for the Snapshot transfer:
-
Create a user account the transfer will use to connect to the source:
CREATE USER <username> IDENTIFIED BY <password>; GRANT CREATE SESSION TO <username>;
-
Grant privileges to the created user:
GRANT SELECT ON V$DATABASE TO <username>; GRANT SELECT ON DBA_EXTENTS TO <username>; GRANT SELECT ON DBA_OBJECTS TO <username>; GRANT FLASHBACK ANY TABLE TO <username>;
If required, you can only grant the
FLASHBACK
privileges to the tables you need to copy rather than toANY TABLE
. -
Grant the user the privilege to read the tables
to be copied.
-
-
To prepare the source for the Replication transfer:
-
Create a user account the transfer will use to connect to the source:
CREATE USER <username> IDENTIFIED BY <password>; ALTER USER <username> DEFAULT tablespace USERS TEMPORARY tablespace TEMP; ALTER USER <username> quote unlimited on USERS; GRANT CREATE SESSION, execute_catalog_role, SELECT ANY TRANSACTION, SELECT ANY DISCTIONARY, CREATE PROCEDURE, LOGMINING TO <username>;
-
Grant privileges to the created user:
GRANT SELECT ON V$DATABASE TO <username>; GRANT SELECT ON V$LOG TO <username>; GRANT SELECT ON V$LOGFILE TO <username>; GRANT SELECT ON V$ARCHIVED_LOG TO <username>; GRANT SELECT ON dba_objects TO <username>; GRANT SELECT ON dba_extents TO <username>; GRANT EXECUTE ON SYS.DBMS_LOGMNR TO <username>; GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO <username>; GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO <username>; GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO <username>; GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO <username>;
-
Grant the user the privilege to read the tables
to be replicated. -
Enable Minimal Supplemental Logging
with primary keys as follows:ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
-
-
If you are using the CDB environment
, configure the following settings:-
Create a user
Common User
:CREATE USER C##<username> IDENTIFIED BY <password> CONTAINER=all; ALTER USER C##<username> DEFAULT TABLESPACE USERS temporary tablespace TEMP CONTAINER=all; ALTER USER C##<username> quota unlimited on USERS CONTAINER=all; ALTER USER C##<username> SET container_data = (cdb$root, <your_PCB_name>) CONTAINER=current; GRANT CREATE SESSION, execute_catalog_role, SELECT ANY TRANSACTION, SELECT ANY DICTIONALY, CREATE PROCEDURE, LOGMINING, SET CONTAINER TO C##<username> CONTAINER=ALL;
If required, you can only specify the
cdb$root
container and the container with the tables you need to transfer. -
To allow the user to switch to the
cdb$root
container, grant them theALTER SESSION
privileges:GRANT ALTER SESSION TO C##<username>;
-
Grant privileges to the created user:
GRANT SELECT ON V$DATABASE TO C##<username> CONTAINER=ALL; GRANT SELECT ON V$LOG TO C##<username> CONTAINER=ALL; GRANT SELECT ON V$LOGFILE TO C##<username> CONTAINER=ALL; GRANT SELECT ON V$ARCHIVED_LOG TO C##<username> CONTAINER=ALL; GRANT SELECT ON dba_objects TO C##<username> CONTAINER=ALL; GRANT SELECT ON dba_extents TO C##<username> CONTAINER=ALL; GRANT EXECUTE ON SYS.DBMS_LOGMNR TO C##<username> CONTAINER=ALL; GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO C##<username> CONTAINER=ALL; GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO C##<username> CONTAINER=ALL; GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO C##<username> CONTAINER=ALL; GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO C##<username> CONTAINER=ALL;
-
PostgreSQL source
Note
When performing a transfer from PostgreSQL to a target of any type, objects of the large object
When transferring data of the TIMESTAMP WITHOUT TIME ZONE
type, the time zone set in the timezone
parameter of the PostgreSQL source database applies.
Example
The column with the TIMESTAMP WITHOUT TIME ZONE
data type contains the 1970-01-01 00:00:00
value. The way the value will be transferred depends on the timezone
parameter in the database:
- If the parameter is set to
Etc/UTC
, the time will be transferred as1970-01-01 00:00:00+00
. - If the parameter is set to
Europe/Moscow
, the time will be transferred as1970-01-01 00:00:00+03
.
Data stored in a MATERIALIZED VIEW
is not transferred. To transfer MATERIALIZED VIEW
data, create an ordinary VIEW
that refers to the MATERIALIZED VIEW
to be transferred.
If the definition of the VIEW
to be transferred contains an invocation of the VOLATILE
functionVIEW
with the READ UNCOMMITTED
isolation level. No consistency between the VIEW
data and the data of other objects being transferred is guaranteed. Reading data from a MATERIALIZED VIEW
in the VIEW
definition are equivalent to invoking the VOLATILE
function.
Large objects in the TOAST storage system
-
Configure the user the transfer will use to connect to the source:
-
For Replication and Snapshot and increment transfer types, assign the
mdb_replication
role to this user. -
Connect to the database you want to migrate as the database owner and configure privileges:
SELECT
for all the database tables to transfer.SELECT
for all the database sequences to transfer.USAGE
for the schemas of those tables and sequences.ALL PRIVILEGES
(CREATE
andUSAGE
) for the service table (__consumer_keeper
and__data_transfer_mole_finder
) schema defined by the endpoint parameter if the endpoint is going to be used for the Replication or Snapshot and increment transfer types.
-
Configure the number of user connections to the database.
-
If the replication source is a cluster, enable the
pg_tm_aux
extension for it. This will allow replication to continue even after changing the master host. In some cases, a transfer may end in an error after you replace a master in your cluster. For more information, see Troubleshooting. -
To transfer tables without primary keys for the Replication and Snapshot and increment transfer types, you must add the
REPLICA IDENTITY
:- Do not transfer tables without primary keys. For this purpose, add them to the list of excluded tables in source endpoint settings.
- Add the replica ID to tables without
primary keys
:-
For tables with an index, set
REPLICA IDENTITY
byunique key
:ALTER TABLE MY_TBL REPLICA IDENTITY USING INDEX MY_IDX;
-
For tables without an index, change
REPLICA IDENTITY
:ALTER TABLE MY_TBL REPLICA IDENTITY FULL;
In this case, Data Transfer will treat such tables as tables where the primary key is a composite key that includes all columns of the table.
-
If there are no primary keys in a table, logical replication will not include any changes in the rows
(UPDATE
orDELETE
).-
If, while trying to transfer tables from PostgreSQL to PostgreSQL, you do not exclude a table without primary keys on the transfer source, the following error message will be returned:
failed to run (abstract1 source): Cannot parse logical replication message: failed to reload schema: primary key check failed: Tables: n / N check failed: "public"."MY_TBL": no key columns found
-
If, while trying to transfer tables from PostgreSQL to a different database, you add a table without primary keys, the following error message will be returned:
failed to run (abstract1 source): Cannot parse logical replication message: failed to reload schema: primary key check failed: Tables: n / N check failed: "public"."MY_TBL": no key columns found
-
Disable the transfer of external keys when creating a source endpoint. Recreate them once the transfer is completed.
-
Find and terminate DDL queries that are running for too long. To do this, make a selection from the PostgreSQL
pg_stat_activity
system table:SELECT NOW() - query_start AS duration, query, state FROM pg_stat_activity WHERE state != 'idle' ORDER BY 1 DESC;
This will return a list of queries running on the server. Pay attention to queries with a high
duration
value. -
Deactivate trigger transfer at the transfer initiation stage and reactivate it at the completion stage (for the Replication and the Snapshot and increment transfer types). For more information, see the description of additional endpoint settings for the PostgreSQL source.
-
To enable parallel data reads from the table, set its primary key to serial mode
.Then specify the number of jobs and threads in the transfer parameters under Runtime environment.
-
Configure WAL monitoring.
Replication and Snapshot and increment transfers use logical replication
. To perform the replication, the transfer creates a replication slot whereslot_name
matches the transfer ID, which you can get by selecting the transfer in the list of your transfers. Your WAL may grow due to different reasons: a long-running transaction or a transfer issue. Therefore, we recommend you to configure WAL monitoring on the source side.-
To monitor storage or disk space usage, use monitoring tools to set up an alert (see the
disk.used_bytes
description). -
Set the maximum WAL size for replication in the
Max slot wal keep size
setting. The value of this setting can be edited as of PostgreSQL version 13. To urgently disable a transfer to perform data reads, delete the replication slot.Warning
If set to
-1
(unlimited size), you will not be able to delete WAL files due to open logical replication slots the information is not read from. As a result, the WAL files will take up the entire disk space and you will not be able to connect to the cluster. -
Set up an alert with the Yandex Monitoring tools for the metric used for
Total size of WAL files
. Make sure the threshold values are less than those specified for thedisk.used_bytes
metric because, apart from the data, the disk stores temporary files, the WAL, and other types of data. You can monitor the current slot size by running this DB query with the correctslot_name
, which matches the transfer ID:SELECT slot_name, pg_size_pretty(pg_current_wal_lsn() - restart_lsn), active_pid, catalog_xmin, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '<transfer_ID>'
-
Configuration
-
In the
postgresql.conf
file, set the value of thewal_level
parameter tological
:wal_level = logical
-
Restart PostgreSQL.
If the replication source is a cluster, install and enable the pg_tm_aux
To transfer tables without primary keys for the Replication and Snapshot and increment transfer types, you must add the REPLICA IDENTITY
:
- Do not transfer tables without primary keys. For this purpose, add them to the list of excluded tables in source endpoint settings.
- Add the replica ID to tables without
primary keys
:-
For tables with an index, set
REPLICA IDENTITY
byunique key
:ALTER TABLE MY_TBL REPLICA IDENTITY USING INDEX MY_IDX;
-
For tables without an index, change
REPLICA IDENTITY
:ALTER TABLE MY_TBL REPLICA IDENTITY FULL;
In this case, Data Transfer will treat such tables as tables where the primary key is a composite key that includes all columns of the table.
-
If there are no primary keys in a table, logical replication will not include any changes in the rowsUPDATE
or DELETE
).
-
If, while trying to transfer tables from PostgreSQL to PostgreSQL, you do not exclude a table without primary keys on the transfer source, the following error message will be returned:
failed to run (abstract1 source): Cannot parse logical replication message: failed to reload schema: primary key check failed: Tables: n / N check failed: "public"."MY_TBL": no key columns found
-
If, while trying to transfer tables from PostgreSQL to a different database, you add a table without primary keys, the following error message will be returned:
failed to run (abstract1 source): Cannot parse logical replication message: failed to reload schema: primary key check failed: Tables: n / N check failed: "public"."MY_TBL": no key columns found
Disable the transfer of external keys when creating a source endpoint. Recreate them once the transfer is completed.
Find and terminate DDL queries that are running for too long. To do this, make a selection from the PostgreSQL pg_stat_activity
system table:
SELECT NOW() - query_start AS duration, query, state
FROM pg_stat_activity
WHERE state != 'idle' ORDER BY 1 DESC;
This will return a list of queries running on the server. Pay attention to queries with a high duration
value.
Deactivate trigger transfer at the transfer initiation stage and reactivate it at the completion stage (for the Replication and the Snapshot and increment transfer types). For more information, see the description of additional endpoint settings for the PostgreSQL source.
To enable parallel data reads from the table, set its primary key to serial mode
Then specify the number of workers and threads in the transfer parameters under Runtime environment.
If replication via Patroni
ignore_slots:
- database: <database>
name: <replication_slot>
plugin: wal2json
type: logical
Where:
database
: Name of the database the transfer is configured for.name
: Replication slot name.
The database and the replication slot names must match the values specified in the source endpoint settings. By default, the replication slot name
is the same as the transfer ID
.
Otherwise, the start of the replication phase will fail:
Warn(Termination): unable to create new pg source: Replication slotID <replication_slot_name> does not exist.
Configure WAL monitoring. Replication and Snapshot and increment transfers use logical replicationslot_name
matches the transfer ID, which you can get by selecting the transfer in the list of your transfers. Your WAL may grow due to different reasons: a long-running transaction or a transfer issue. Therefore, we recommend you to configure WAL monitoring on the source side.
-
Set up alerts as described in the disk usage recommendations
. -
Set the maximum WAL size
. This feature is available starting with PostgreSQL version 13. -
You can monitor the current slot size by running this DB query with the correct
slot_name
, which matches the transfer ID:SELECT slot_name, pg_size_pretty(pg_current_wal_lsn() - restart_lsn), active_pid, catalog_xmin, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '<transfer_ID>'
Note
For things to note about data transfer from PostgreSQL to ClickHouse® using Replication and Snapshot and increment transfers, see Asynchronously replicating data from PostgreSQL to ClickHouse®.
Yandex Data Streams source
-
Create a service account with the
yds.editor
role. -
(Optional) Create a processing function.
Processing function example
const yc = require("yandex-cloud"); const { Parser } = require("@robojones/nginx-log-parser"); module.exports.handler = async function (event, context) { const schema = '$remote_addr - $remote_user [$time_local] "$request" $status $bytes_sent "$http_referer" "$http_user_agent"'; const parser = new Parser(schema); return { Records: event.Records.map((record) => { const decodedData = new Buffer(record.kinesis.data, "base64") .toString("ascii") .trim(); try { const result = parser.parseLine(decodedData); if (result.request == "") { // empty request - drop message return { eventID: record.eventID, invokeIdentityArn: record.invokeIdentityArn, eventVersion: record.eventVersion, eventName: record.eventName, eventSourceARN: record.eventSourceARN, result: "Dropped" }; } return { // successfully parsed message eventID: record.eventID, invokeIdentityArn: record.invokeIdentityArn, eventVersion: record.eventVersion, eventName: record.eventName, eventSourceARN: record.eventSourceARN, kinesis: { data: new Buffer(JSON.stringify(result)).toString( "base64" ), }, result: "Ok" }; } catch (err) { // error - fail message return { eventID: record.eventID, invokeIdentityArn: record.invokeIdentityArn, eventVersion: record.eventVersion, eventName: record.eventName, eventSourceARN: record.eventSourceARN, result: "ProcessingFailed", }; } }) }; };
-
(Optional) Prepare a data schema file in JSON format.
Sample file with a data schema:
[ { "name": "<field_name>", "type": "<type>" }, ... { "name": "<field_name>", "type": "<type>" } ]
Supported types:
any
boolean
datetime
double
int8
int16
int32
int64
string
uint8
uint16
uint32
uint64
utf8
Yandex Managed Service for YDB source
If you selected Dedicated database mode, create and configure a security group in the network hosting the DB.
Preparing a target
ClickHouse® target
-
If you need to transfer multiple databases, create a separate transfer for each one of them.
-
Create a user with access to the target database.
Once started, the transfer will connect to the target on behalf of this user.
-
If user management via SQL is enabled in the cluster, grant the new user the following permissions:
GRANT CLUSTER ON *.* TO <username> GRANT SELECT, INSERT, ALTER DELETE, CREATE TABLE, CREATE VIEW, DROP TABLE, TRUNCATE, dictGet ON <DB_name>.* TO <username> GRANT SELECT(macro, substitution) ON system.macros TO <username>
If user management via SQL is disabled, permissions are assigned via the management console and CLI.
-
Assign the created security group to the Managed Service for ClickHouse® cluster.
-
If not planning to use Cloud Interconnect or VPN
for connections to an external cluster, make such cluster accessible from the Internet from IP addresses used by Data Transfer .For details on linking your network up with external resources, see this concept.
-
Create a target database. Its name must be the same as the source database name. If you need to transfer multiple databases, create a separate transfer for each one of them.
-
Create a user with access to the target database.
Once started, the transfer will connect to the target on behalf of this user.
-
Grant the new user the following permissions:
GRANT CLUSTER ON *.* TO <username> GRANT SELECT, INSERT, ALTER DELETE, CREATE TABLE, CREATE VIEW, DROP TABLE, TRUNCATE, dictGet ON <DB_name>.* TO <username> GRANT SELECT(macro, substitution) ON system.macros TO <username>
Elasticsearch target
-
If not planning to use Cloud Interconnect or VPN
for connections to an external cluster, make such cluster accessible from the Internet from IP addresses used by Data Transfer .For details on linking your network up with external resources, see this concept.
-
Make sure the number of columns in the source does not exceed the maximum number of fields in Elasticsearch indexes. The maximum number of fields is provided in the
index.mapping.total_fields.limit
parameter ; the default value is1000
.To increase the parameter value, set up a template
that makes the maximum number of fields in new indexes equal to the specified value.Sample template setup request
curl \ --user <Elasticsearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT "https://<Elasticsearch_cluster_FQDN>:9200/_template/index_defaults" \ --data ' { "index_patterns": "cdc*", "settings": { "index": { "mapping": { "total_fields": { "limit": "2000" } } } } }'
With this template setup, all new indexes with the
cdc*
mask may contain up to2000
fields.You can also set up templates using the Kibana interface
.To check the current
index.mapping.total_fields.limit
parameter value, use the Kibana interface or execute the following request:curl \ --user <Elasticsearch_username>:<password> \ --header 'Content-Type: application/json' \ --request GET 'https://<Elasticsearch_cluster_FQDN>:9200/<index_name>/_settings/*total_fields.limit?include_defaults=true'
-
By default, when transferring data to a single index, only one host is used. To distribute the load across hosts when transferring large amounts of data, set up a template
to split new indexes into shards in advance.Sample template setup request
curl \ --user <Elasticsearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<Elasticsearch_cluster_FQDN>:9200/_template/index_defaults' \ --data ' { "index_patterns": "cdc*", "settings" : { "index" : { "number_of_shards" : 15, "number_of_replicas" : 1 } } }'
With this template setup, all new indexes with the
cdc*
mask will be split into15
shards.You can also set up templates using the Kibana interface
.
Greenplum® target
-
Disable the following settings on the target:
- Integrity checks for foreign keys
- Triggers
- Other constraints
Warning
Do not reactivate these settings before the transfer is complete. This will ensure data integrity with respect to foreign keys.
-
Create a user:
CREATE ROLE <username> LOGIN ENCRYPTED PASSWORD '<password>';
-
Grant the user all privileges for the database, schemas, and tables to be transferred:
GRANT ALL PRIVILEGES ON DATABASE <database_name> TO <username>;
If the database is not empty, the user must be its owner:
ALTER DATABASE <database_name> OWNER TO <username>;
Once started, the transfer will connect to the target on behalf of this user.
-
If not planning to use Cloud Interconnect or VPN
for connections to an external cluster, make such cluster accessible from the Internet from IP addresses used by Data Transfer .For details on linking your network up with external resources, see this concept.
-
Disable the following settings on the target:
- Integrity checks for foreign keys
- Triggers
- Other constraints
Warning
Do not reactivate these settings before the transfer is complete. This will ensure data integrity with respect to foreign keys.
-
Create a user:
CREATE ROLE <username> LOGIN ENCRYPTED PASSWORD '<password>';
-
Grant the user all privileges for the database, schemas, and tables to be transferred:
GRANT ALL PRIVILEGES ON DATABASE <database_name> TO <username>;
If the database is not empty, the user must be its owner:
ALTER DATABASE <database_name> OWNER TO <username>;
Once started, the transfer will connect to the target on behalf of this user.
MongoDB target
-
Create a user with the
readWrite
role for the new database. -
To shard the migrated collections in the Yandex Managed Service for MongoDB target cluster:
-
Use this guide to create and configure empty sharded collections in the target database.
Data Transfer does not automatically shard the migrated collections. Sharding large collections may take a long time and slow down the transfer.
-
If sharding uses any key other than
_id
(default), assign themdbShardingManager
role to the user. -
When creating a target endpoint, select
DISABLED
orTRUNCATE
as your cleanup policy.Selecting the
DROP
policy will result in the service deleting all the data from the target database, including sharded collections, and replacing them with new unsharded ones when a transfer is activated.
Learn more about sharding in the MongoDB documentation
. -
-
If not planning to use Cloud Interconnect or VPN
for connections to an external cluster, make such cluster accessible from the Internet from IP addresses used by Data Transfer .For details on linking your network up with external resources, see this concept.
-
Make sure the MongoDB version on the target is not lower than that on the source.
-
Make sure the MongoDB cluster is configured so that it returns correctly resolving IP addresses or FQDNs (fully qualified domain names) in response to requests.
-
Configure the target cluster
to allow connections from the internet:-
In the configuration file, change
net.bindIp
from127.0.0.1
to0.0.0.0
:# network interfaces net: port: 27017 bindIp: 0.0.0.0
-
Restart
mongod
:sudo systemctl restart mongod.service
-
-
If the target cluster does not use replication, enable it:
-
Add the replication settings to the
/etc/mongod.conf
configuration file:replication: replSetName: <replica_set_name>
-
Restart
mongod
:sudo systemctl restart mongod.service
-
Connect to MongoDB and initialize the replica set with this command:
rs.initiate({ _id: "<replica_set_name>", members: [{ _id: 0, host: "<IP_address_listened_by_MongoDB>:<port>" }] });
-
-
Connect to the cluster and create a target database:
use <database_name>
-
Create a user with the
readWrite
role for the target database:use admin; db.createUser({ user: "<username>", pwd: "<password>", mechanisms: ["SCRAM-SHA-1"], roles: [ { db: "<target_database_name>", role: "readWrite" } ] });
Once started, the transfer will connect to the target on behalf of this user.
-
To shard the migrated collections in the target cluster:
-
Set up a database and populate it with empty collections with the same names as those in the source.
Data Transfer does not automatically shard the migrated collections. Sharding large collections may take a long time and slow down the transfer.
-
Enable target database sharding:
sh.enableSharding("<target_database_name>")
-
Shard every collection based on its namespace:
sh.shardCollection("<target_database_name>.<collection_name>", { <field_name>: <1|"hashed">, ... });
For the
shardCollection()
function description, see the MongoDB documentation . -
To make sure that sharding is set up and enabled, get a list of available shards:
sh.status()
-
If sharding uses any key other than
_id
(default), assign theclusterManager
system role to the user Data Transfer will use for connection to the target cluster:use admin; db.grantRolesToUser("<username>", ["clusterManager"]);
-
When creating a target endpoint, select
DISABLED
orTRUNCATE
as your cleanup policy.Selecting the
DROP
policy will result in the service deleting all the data from the target database, including sharded collections, and replacing them with new unsharded ones when a transfer is activated.
Learn more about sharding in the MongoDB documentation
. -
MySQL® target
-
Make sure that the MySQL® major version on the target is not lower than that on the source.
-
Set an SQL Mode matching the source.
-
Create a user for connecting to the source.
- Assign the user the
ALL_PRIVILEGES
role for the source database.
- Assign the user the
-
If not planning to use Cloud Interconnect or VPN
for connections to an external cluster, make such cluster accessible from the Internet from IP addresses used by Data Transfer .For details on linking your network up with external resources, see this concept.
-
Make sure that the MySQL® major version on the target is not lower than that on the source.
-
Make sure the target uses the MyISAM or InnoDB low-level storage subsystem.
-
Set an SQL Mode
matching the source. -
Create a user to connect to the target and grant them the required privileges:
CREATE USER '<username>'@'%' IDENTIFIED BY '<password>'; GRANT ALL PRIVILEGES ON <database_name>.* TO '<username>'@'%';
Yandex Object Storage target
- Create a bucket with a configuration you need.
- Create a service account with the
storage.uploader
role.
OpenSearch target
-
Make sure the number of columns in the source does not exceed the maximum number of fields in OpenSearch indexes. The maximum number of fields is provided in the
index.mapping.total_fields.limit
parameter. Its default value is1,000
.Warning
Exceeding the limit will result in the
Limit of total fields [1000] has been exceeded
error and the transfer will be stopped.To increase the parameter value, set up a template
that makes the maximum number of fields in new indexes equal to the specified value.Sample template setup request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT "https://<URL_of_OpenSearch_host_with_DATA_role>:9200/_template/index_defaults" \ --data ' { "index_patterns": "cdc*", "settings": { "index": { "mapping": { "total_fields": { "limit": "2000" } } } } }'
With this template setup, all new indexes with the
cdc*
mask will be able to contain up to2,000
fields.You can also set up templates using the OpenSearch Dashboards interface
.To check the current
index.mapping.total_fields.limit
parameter value, execute the following request:curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request GET 'https://<URL_of_OpenSearch_host_with_DATA_role>:9200/<index name>/_settings/*total_fields.limit?include_defaults=true'
-
By default, when transferring data to a single index, only one host is used. To distribute the load across hosts when transferring large amounts of data, set up a template
to split new indexes into shards in advance.Sample template setup request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<URL_of_OpenSearch_host_with_DATA_role>:9200/_template/index_defaults' \ --data ' { "index_patterns": "cdc*", "settings" : { "index" : { "number_of_shards" : 15, "number_of_replicas" : 1 } } }'
With this template setup, all new indexes with the
cdc*
mask will be split into15
shards.You can also set up templates using the OpenSearch Dashboards interface
. -
To enhance data security and availability, set up a policy that will create a new index if at least one of the following conditions is met (recommended values):
- Index is over 50 GB in size.
- Index is over 30 days old.
You can create and enable a policy using requests. For more information about policies, see the OpenSearch documentation
.Example of a policy creation request
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_plugins/_ism/policies/rollover_policy' \ --data ' { "policy": { "description": "Example rollover policy", "default_state": "rollover", "schema_version": 1, "states": [ { "name": "rollover", "actions": [ { "rollover": { "min_index_age": "30d", "min_primary_shard_size": "50gb" } } ], "transitions": [] } ], "ism_template": { "index_patterns": ["log*"], "priority": 100 } } }'
Example of a request to assign an alias to a policy
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_index_template/ism_rollover' \ --data ' { "index_patterns": ["log*"], "template": { "settings": { "plugins.index_state_management.rollover_alias": "log" } } }'
Example of a request to create an index with a policy alias
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request PUT 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/log-000001' \ --data ' { "aliases": { "log": { "is_write_index": true } } }'
Example of a request to check if a policy is attached to an index
curl \ --user <OpenSearch_username>:<password> \ --header 'Content-Type: application/json' \ --request GET 'https://<address_of_OpenSearch_host_with_DATA_role>:9200/_plugins/_ism/explain/log-000001?pretty'
PostgreSQL target
-
Make sure that the PostgreSQL major version on the target is not lower than that on the source.
-
For transfers from PostgreSQL, enable the same extensions in the target database as in the source database.
If extensions in the source database are installed in a custom schema and are used in the DDLs of the objects you are moving, create DDLs in the target manually. In these DDLs, a function call must not include the schema name. Set the target endpoint cleanup policy to
Truncate
to prevent the transfer from deleting these objects. -
Select the
Drop
cleanup policy for transfer tables.If you have created DDLs in the target manually, use the
Truncate
policy. TheTruncate
policy will not delete these DDLs. -
Create a user with access to the target database.
-
Grant the user all privileges for the database, schemas, and tables to be transferred:
GRANT ALL PRIVILEGES ON DATABASE <database_name> TO <username>;
If the database is not empty, the user must be its owner:
ALTER DATABASE <database_name> OWNER TO <username>;
Once started, the transfer will connect to the target on behalf of this user.
-
If the target has the Save transaction boundaries option enabled, grant the new user all the privileges needed to create the
__data_transfer_lsn
auxiliary table in the current schema (usuallypublic
) in the target:GRANT ALL PRIVILEGES ON SCHEMA <schema_name> TO <username>;
-
Configure the number of user connections to the database.
-
If not planning to use Cloud Interconnect or VPN
for connections to an external cluster, make such cluster accessible from the Internet from IP addresses used by Data Transfer .For details on linking your network up with external resources, see this concept.
-
Make sure that the PostgreSQL major version on the target is not lower than that on the source.
-
In the target database, enable the same extensions that are enabled in the source database.
-
Make sure the target has the
DROP transfer tables
cleanup policy selected. -
Create a user:
CREATE ROLE <username> LOGIN ENCRYPTED PASSWORD '<password>';
-
Grant the user all privileges for the database, schemas, and tables to be transferred:
GRANT ALL PRIVILEGES ON DATABASE <database_name> TO <username>;
If the database is not empty, the user must be its owner:
ALTER DATABASE <database_name> OWNER TO <username>;
Once started, the transfer will connect to the target on behalf of this user.
-
If the target has the Save transaction boundaries option enabled, grant the new user all the privileges needed to create the
__data_transfer_lsn
auxiliary table in the current schema (usuallypublic
) in the target:GRANT ALL PRIVILEGES ON SCHEMA <schema_name> TO <username>;
-
Configure the number of user connections to the database.
Data stored in a MATERIALIZED VIEW
is not transferred. To transfer MATERIALIZED VIEW
data, create an ordinary VIEW
that refers to the MATERIALIZED VIEW
to be transferred.
If the definition of the VIEW
to be transferred contains an invocation of the VOLATILE
functionVIEW
with the READ UNCOMMITTED
isolation level. No consistency between the VIEW
data and the data of other objects being transferred is guaranteed. Reading data from a MATERIALIZED VIEW
in the VIEW
definition are equivalent to invoking the VOLATILE
function.
Yandex Managed Service for YDB target
- Create a service account with the
ydb.editor
role. - For the database running in Dedicated mode, create and configure a security group in the network hosting the DB.
Airbyte® is a registered trademark of Airbyte, Inc in the United States and/or other countries.
Greenplum® and Greenplum Database® are registered trademarks or trademarks of VMware, Inc. in the United States and/or other countries.
ClickHouse® is a registered trademark of ClickHouse, Inc