Миграция базы данных из Google BigQuery в Managed Service for ClickHouse®
Вы можете перенести базу данных из Google BigQuery в Yandex Managed Service for ClickHouse® и затем проанализировать ее с помощью Yandex DataLens.
Таблица переносится в сжатом виде в бакет Google Storage, а из него в бакет Yandex Object Storage. Затем данные импортируются в кластер Managed Service for ClickHouse®, где их можно проанализировать с помощью Yandex DataLens.
Такой способ миграции обладает следующими преимуществами:
- возможность задать формат выгрузки данных и степень сжатия;
- значительное сокращение объема данных и времени на их миграцию, а значит и снижение стоимости миграции.
Однако при таком способе данные переносятся как есть
, без трансформации или копирования обновившихся инкрементов.
Чтобы перенести базу данных из Google BigQuery в Managed Service for ClickHouse®:
- Перенесите данные из Google BigQuery в Yandex Object Storage.
- Настройте отображение данных из Yandex Object Storage в кластере Managed Service for ClickHouse®.
- Проанализируйте данные с помощью Yandex DataLens.
Если созданные ресурсы вам больше не нужны, удалите их.
Перед началом работы
Для миграции базы данных необходимо создать ресурсы Google Cloud и ресурсы Yandex Cloud.
Создайте ресурсы Google Cloud
-
Создайте сервисный аккаунт Google Cloud
с ролямиBigQuery Data Editor
иStorage Object Admin
. -
Создайте ключ доступа для сервисного аккаунта
и сохраните его в виде файла.json
. -
Установите Google BigQuery Python SDK
. Для работы этого пакета потребуется Python версии 3.7 или выше. -
Подготовьте набор данных (датасет) для Google BigQuery. В качестве примера использован публичный датасет
google_trends
для Google BigQuery, содержащий таблицуinternational_top_terms
со столбцами:rank
country_name
country_code
region_name
week
score
region_code
term
refresh_date
Создайте ресурсы Yandex Cloud
-
Создайте сервисный аккаунт с ролью
storage.uploader
для доступа к бакету Object Storage. -
Создайте статический ключ доступа для сервисного аккаунта. Сохраните идентификатор ключа и секретный ключ, они понадобятся далее.
-
Создайте кластер Managed Service for ClickHouse® любой подходящей конфигурации. При создании кластера:
- укажите созданный ранее сервисный аккаунт.
- включите параметр Доступ из DataLens.
-
Создайте бакет Object Storage. При создании включите публичный доступ на чтение объектов и к списку объектов в бакете.
Перенесите данные из Google BigQuery в Yandex Object Storage
-
Создайте файл
credentials.boto
с параметрами доступа к ресурсам Google Cloud и Yandex Cloud:[Credentials] gs_service_client_id =<сервисный_аккаунт_Google_Cloud> gs_service_key_file =<абсолютный_путь_к_JSON-файлу> aws_access_key_id =<идентификатор_ключа_сервисного_аккаунта> aws_secret_access_key =<секретный_ключ_сервисного_аккаунта> [GSUtil] default_project_id =<идентификатор_проекта_Google_Cloud> [s3] calling_format=boto.s3.connection.OrdinaryCallingFormat host=storage.yandexcloud.net
Где:
gs_service_client_id
— имя сервисного аккаунта Google Cloud видаservice-account-name@project-id.iam.gserviceaccount.com
.gs_service_key_file
— абсолютный путь к JSON-файлу ключа доступа сервисного аккаунта Google Cloud.aws_access_key_id
— идентификатор ключа сервисного аккаунта Yandex Cloud.aws_secret_access_key
— секретный ключ сервисного аккаунта Yandex Cloud.default_project_id
— идентификатор проекта Google Cloud .
-
Создайте файл скрипта
main.py
, который выполняет сжатие и миграцию данных:main.py
from google.cloud import bigquery import sys import argparse import time import subprocess import os os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="<абсолютный_путь_к_JSON-файлу_ключа_доступа_сервисного_аккаунта_Google_Cloud>" os.environ["BOTO_CONFIG"]="<абсолютный_путь_к_файлу_credentials.boto>" def parse_args(): parser = argparse.ArgumentParser(description='Export data from Google Big Query to Yandex Cloud object storage') parser.add_argument('--bq_project', type=str, help='GBQ project ID') parser.add_argument('--bq_location', type=str, help='GBQ table AND GS location') parser.add_argument('--gs_bucket', type=str, help='GS export destination bucket') parser.add_argument('--yc_bucket', type=str, help='YC copy destination bucket') parser.add_argument('--gsutil_path', type=str, help='GSutil exec path', default='gsutil') return parser.parse_args() def select_from_list(message, elements): print(message) print("\t{}. {}".format(0, "Export all")) for ind in range(len(elements)): if isinstance(elements[ind].reference, bigquery.DatasetReference): print("\t{}. {}".format(ind+1, elements[ind].reference.dataset_id)) elif isinstance(elements[ind].reference, bigquery.TableReference): print("\t{}. {}".format(ind+1, elements[ind].reference.table_id)) try: return int(input("(any letter for cancel) >> ")) except ValueError: print("Exiting") sys.exit() if __name__ == '__main__': args = parse_args() client = bigquery.Client() datasets = list(client.list_datasets(args.bq_project)) dataset_selector = select_from_list("Datasets in project {}".format(args.bq_project), datasets) export_list = [] for i in range(len(datasets)): dataset_ref = datasets[i].reference if dataset_selector == 0: export_list += list(client.list_tables(dataset_ref)) else: if i == dataset_selector - 1: tables = list(client.list_tables(dataset_ref)) table_selector = select_from_list("Tables in dataset {}".format(dataset_ref.dataset_id), tables) for j in range(len(tables)): if table_selector == 0 or j == table_selector - 1: export_list.append(tables[j]) print("Starting tables export") for n in range(len(export_list)): table_ref = export_list[n].reference # Creating Extract Job config. Selecting compression level and data format. job_config = bigquery.job.ExtractJobConfig() job_config.compression = bigquery.Compression.GZIP job_config.destination_format = bigquery.DestinationFormat.PARQUET print("Exporting {} table".format(table_ref.table_id)) extract_job = client.extract_table( source=table_ref, destination_uris="gs://{}/{}".format(args.gs_bucket, "{}-*".format(table_ref.table_id)), job_id="export-job-{}-{}".format(table_ref.table_id, round(time.time() * 1000)), location=args.bq_location, job_config=job_config) extract_job.result() print("Tables export done") # Calling gsutil rsync to synchronize source and destination buckets. source_uri = "gs://{}/".format(args.gs_bucket) destination_uri = "s3://{}/".format(args.yc_bucket) print("Synchronizing {} with {}...".format(source_uri, destination_uri)) proc = subprocess.Popen([args.gsutil_path, "-m", "rsync", source_uri, destination_uri], stdout=sys.stdout, stderr=sys.stderr) proc.communicate() print("Buckets synchronization done")
-
Выполните скрипт
main.py
, чтобы запустить миграцию данных из Google BigQuery в бакет Google Storage, а затем в бакет Yandex Object Storage:python main.py \ --bq_project=<идентификатор_проекта_Google_Cloud> \ --bq_location=US \ --gs_bucket=<имя_бакета_Google_Cloud_Storage> \ --yc_bucket=<имя_бакета_Object_Storage>
Дождитесь окончания миграции данных.
Настройте отображение данных из Yandex Object Storage в кластере Managed Service for ClickHouse®
-
Чтобы создать представление импортированных данных, подключитесь к базе данных кластера Managed Service for ClickHouse® и выполните SQL-запрос:
CREATE view db1.v$google_top_rising_terms on cluster on cluster '{cluster}' AS (SELECT term, score, rank, country_name, country_code, region_name, region_code, week, refresh_date FROM s3Cluster( '<идентификатор_кластера>', 'https://storage.yandexcloud.net/<имя_бакета_Object_Storage>/top_terms-*', 'Parquet', 'rank Int32, country_name String, country_code String, region_name String, week Timestamp, score Nullable(Int32), region_code String, term String, refresh_date Timestamp') )
Где:
db1
— название базы данных в кластере Managed Service for ClickHouse®, в которой требуется создать представление.v$google_top_rising_terms
— название представления для отображения импортированных данных.<идентификатор_кластера>
— идентификатор кластера Managed Service for ClickHouse®. Его можно получить вместе со списком кластеров в каталоге.top_terms-*
— ключевая часть имени объектов бакета Object Storage. Например, если из Google Cloud вы перенесли таблицу, в которой есть строки с именемtop_terms
, то в бакете Object Storage они будут выглядеть как набор объектов с именамиtop_terms-000000000001
,top_terms-000000000002
и т. д. Тогда в SQL-запросе нужно указатьtop_terms-*
, чтобы в представление попали все записи с таким именем из этой таблицы.
-
Чтобы вывести первые 100 записей из созданного представления, выполните SQL-запрос (для примера используется представление
v$google_top_rising_terms
и базе данныхdb1
):SELECT * FROM db1.v$google_top_rising_terms limit 100
Проанализируйте данные с помощью Yandex DataLens
-
Подключите кластер Managed Service for ClickHouse® к DataLens.
-
Создайте датасет из таблицы
db1.v$google_top_rising_terms
. Для поляscore
выберите агрегацию по среднему значению. -
Создайте столбчатую диаграмму:
- В секцию X перетащите поле
country_name
. - В секцию Y перетащите поле
score
. - В секцию Фильтры перетащите поле
term
. В открывшейся форме задайте параметры:- Операция — Принадлежит множеству.
- Доступны — введите термин из списка доступных и нажмите кнопку Применить фильтр.
- В секцию Сортировка перетащите поле
term
.
- В секцию X перетащите поле
Использование заданного запроса в поисковой системе будет проанализировано, результат будет выведен в виде столбчатой диаграммы по странам.
Удалите созданные ресурсы
Удалите ресурсы, которые вы больше не будете использовать, чтобы за них не списывалась плата:
- Удалите кластер Managed Service for ClickHouse®.
- Удалите все объекты бакета Object Storage и затем удалите сам бакет.
- Удалите бакет Google Storage
.
ClickHouse® является зарегистрированным товарным знаком ClickHouse, Inc