Совместная работа с таблицами с использованием Metastore
Вы можете сохранять данные из кластера Yandex Data Processing в бакет Yandex Object Storage, используя отдельный кластер Hive Metastore для хранения метаданных таблиц. Это позволит затем работать с сохраненными данными другому кластеру Yandex Data Processing, имеющему доступ к бакету и подключенному к тому же кластеру Metastore.
Чтобы настроить совместное использование таблиц двумя кластерами Yandex Data Processing с помощью Metastore:
- Подключите Yandex Data Processing к Metastore.
- Создайте тестовую таблицу.
- Получите данные во втором кластере.
Если созданные ресурсы вам больше не нужны, удалите их.
Если в кластере Yandex Data Processing есть таблицы, которые должны быть доступны в другом кластере Yandex Data Processing, перенесите таблицы в нужный кластер с помощью Metastore.
Перед началом работы
Подготовьте инфраструктуру:
-
Создайте сервисный аккаунт с именем
dataproc-s3-sa
и назначьте ему ролиdataproc.agent
иdataproc.provisioner
. -
В Object Storage создайте бакеты и настройте доступ к ним:
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
READ
для этого бакета. - Создайте бакет для результатов обработки и предоставьте сервисному аккаунту кластера разрешение
READ и WRITE
для этого бакета.
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
-
Создайте облачную сеть с именем
dataproc-network
. -
В сети
dataproc-network
создайте подсеть в любой зоне доступности. -
Настройте NAT-шлюз для созданной подсети.
-
Создайте два кластера Yandex Data Processing с именами
dataproc-source
иdataproc-target
, с любой подходящей конфигурацией хостов и следующими настройками:- Сервисы:
SPARK
;YARN
.
- Сервисный аккаунт —
dataproc-sa
. - Свойства —
spark:spark.sql.hive.metastore.sharedPrefixes
со значениемcom.amazonaws,ru.yandex.cloud
. Нужно для выполнения заданий PySpark и для интеграции с Metastore. - Имя бакета — бакет, который вы создали для выходных данных.
- Сеть —
dataproc-network
.
- Сервисы:
-
Если в облачной сети используются группы безопасности, добавьте в группу безопасности кластеров Yandex Data Processing следующее правило для исходящего трафика:
- Диапазон портов —
9083
. - Протокол —
Любой
(Any
). - Источник —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации dataproc-to-dataproc.tf
.В этом файле описаны:
- сеть;
- подсеть;
- NAT-шлюз и таблица маршрутизации, необходимые для работы Yandex Data Processing;
- группы безопасности, необходимые для кластеров Yandex Data Processing;
- сервисный аккаунт, необходимый для работы кластера Yandex Data Processing;
- сервисный аккаунт, необходимый для создания бакетов в Object Storage;
- бакеты для входных и выходных данных;
- два кластера Yandex Data Processing.
-
Укажите в файле
dataproc-to-dataproc.tf
:folder_id
— идентификатор облачного каталога, такой же как в настройках провайдера.input-bucket
— имя бакета для входных данных.output-bucket
— имя бакета для выходных данных.dp_ssh_key
— абсолютный путь к публичному ключу для кластеров Yandex Data Processing. Подробнее см. в разделе SSH-подключение к хосту Yandex Data Processing.
-
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
Важно
Не назначайте на бакет политику доступа, иначе кластер Metastore не сможет записывать данные в бакет.
Подключите Yandex Data Processing к Metastore
-
Создайте кластер Metastore в сети
dataproc-network
. -
Добавьте в настройки кластеров Yandex Data Processing свойство
spark:spark.hive.metastore.uris
со значениемthrift://<IP-адрес_кластера_Metastore>:9083
.Чтобы узнать IP-адрес кластера Metastore, в консоли управления
выберите сервис Yandex Data Processing и на левой панели выберите страницу Metastore-сервер. IP-адрес кластера указан в блоке Общая информация.
Создайте тестовую таблицу
В кластере dataproc-source
создайте тестовую таблицу countries
и загрузите ее в Object Storage:
-
Подготовьте файл скрипта:
-
Создайте локально файл с именем
create-table.py
и скопируйте в него следующий скрипт:create-table.py
from pyspark.sql.types import * from pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder \ .appName("create-table") \ .enableHiveSupport() \ .getOrCreate() # Создание схемы данных schema = StructType([StructField('Name', StringType(), True), StructField('Capital', StringType(), True), StructField('Area', IntegerType(), True), StructField('Population', IntegerType(), True)]) # Создание датафрейма df = spark.createDataFrame([('Австралия', 'Канберра', 7686850, 19731984), ('Австрия', 'Вена', 83855, 7700000)], schema) # Запись датафрейма в бакет в виде таблицы countries df.write.mode("overwrite").option("path","s3a://<имя_выходного_бакета>/countries").saveAsTable("countries")
-
Укажите в скрипте имя выходного бакета, в который будет сохранен файл с таблицей
countries
. -
Создайте в бакете для входных данных папку
scripts
и загрузите в нее файлcreate-table.py
.
-
-
Создайте задание PySpark, указав в поле Main python файл путь к файлу скрипта:
s3a://<имя_входного_бакета>/scripts/create-table.py
. -
Дождитесь завершения задания и проверьте, что в выходном бакете в папке
countries
появился файлpart-00000-...
.
Теперь данные из созданной таблицы хранятся в бакете Object Storage, а метаинформация о ней — в кластере Metastore. Кластер dataproc-source
можно удалить.
Получите данные во втором кластере
Загрузите метаинформацию о таблице countries
в кластер dataproc-target
и убедитесь, что таблица стала доступна в кластере для дальнейшей работы:
-
Подготовьте файл скрипта:
-
Создайте локально файл с именем
obtain-table.py
и скопируйте в него следующий скрипт:obtain-table.py
from pyspark.sql import SparkSession # Создание Spark-сессии spark = SparkSession.builder \ .appName("obtain-table") \ .enableHiveSupport() \ .getOrCreate() spark.catalog.listDatabases() # Получение информации о таблице countries из Metastore df = spark.sql("describe extended countries") # Запрос данных из таблицы countries df = spark.sql("select * from countries") # Перенос таблицы в бакет для проверки df.repartition(1).write.csv(path='s3a://<имя_выходного_бакета>/csv', header=True, sep=',')
-
Укажите в скрипте имя выходного бакета, в который будет сохранен CSV-файл с таблицей
countries
. -
Загрузите файл
obtain-table.py
в бакет для входных данных в папкуscripts
.
-
-
Создайте задание PySpark, указав в поле Main python файл путь к файлу скрипта:
s3a://<имя_входного_бакета>/scripts/obtain-table.py
. -
Дождитесь выполнения задания и убедитесь, что в выходном бакете появилась папка
csv
с таблицей в формате CSV.
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них:
-
Удалите остальные ресурсы в зависимости от способа их создания:
ВручнуюTerraform-
Удалите объекты из бакетов.
-
В терминале перейдите в директорию с планом инфраструктуры.
-
Удалите конфигурационный файл
dataproc-to-dataproc.tf
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
Все ресурсы, которые были описаны в конфигурационном файле
dataproc-to-dataproc.tf
, будут удалены. -
-