Использование Yandex Object Storage в сервисе Yandex Managed Service for Apache Spark™
В бакетах Yandex Object Storage можно хранить как файлы, необходимые для выполнения заданий в кластере Yandex Managed Service for Apache Spark™, так и результаты выполнения заданий.
Для использования Object Storage в сервисе Managed Service for Apache Spark™:
Если созданные ресурсы вам больше не нужны, удалите их.
Необходимые платные ресурсы
В стоимость поддержки описываемого решения входят:
- Плата за бакеты Object Storage: использование хранилища и выполнение операций с данными (см. тарифы Object Storage).
- Плата за сервис Cloud Logging: объем записываемых данных и время их хранения (см. тарифы Cloud Logging).
Подготовьте инфраструктуру
-
Создайте сервисный аккаунт
spark-agentдля кластера Apache Spark™ с ролью managed-spark.integrationProvider — чтобы кластер Apache Spark™ мог взаимодействовать с другими ресурсами. -
<бакет_для_исходного_кода_PySpark_задания>.<бакет_для_выходных_данных_PySpark_задания>.
-
Предоставьте разрешения для сервисного аккаунта
spark-agentна созданные бакеты:<бакет_для_исходного_кода_PySpark_задания>— разрешениеREAD.<бакет_для_выходных_данных_PySpark_задания>— разрешениеREAD и WRITE.
-
Создайте облачную сеть с именем
spark-network.Вместе с ней автоматически будут созданы три подсети в разных зонах доступности.
-
Создайте кластер Managed Service for Apache Spark™ с параметрами:
- Сервисный аккаунт —
spark-agent. - Сеть —
spark-network. - Подсеть —
spark-network-ru-central1-a.
- Сервисный аккаунт —
Подготовьте PySpark-задание
Для PySpark-задания будет использован Python-скрипт, который хранится в бакете Object Storage и создает таблицу table_1 в БД database_1. Подготовьте файл скрипта:
-
Создайте локально файл с именем
job_save_table.pyи скопируйте в него скрипт:job_save_table.py
import random import sys from pyspark.sql import SparkSession def prepare_table(spark, database, table): create_database_sql = "create database if not exists {database}" create_table_sql = """ create table if not exists {database}.{table} ( id int, value double ) """ truncate_table_sql = "truncate table {database}.{table}" spark.sql(create_database_sql.format(database=database)) spark.sql(create_table_sql.format(database=database, table=table)) spark.sql(truncate_table_sql.format(database=database, table=table)) def write_data(spark, database, table): data = [(i, random.random()) for i in range(100_000)] # Создание датафрейма df = spark.createDataFrame(data, schema=['id', 'value']) table_full_name = "{database}.{table}".format(database=database, table=table) df.write.mode('overwrite').format('json').saveAsTable(table_full_name) def main(): # Создание Spark-сессии spark = ( SparkSession .builder .appName('job_save_table') .config('spark.executor.instances', 1) .config('spark.sql.warehouse.dir', sys.argv[1]) .config('spark.sql.catalogImplementation', 'hive') .getOrCreate() ) database, table = 'database_1', 'table_1' prepare_table(spark, database, table) write_data(spark, database, table) if __name__ == '__main__': if len(sys.argv) != 2: print("Usage: job-save-table s3a://<bucket>/<folder>", file=sys.stderr) sys.exit(-1) main() -
Создайте в бакете
<бакет_для_исходного_кода_PySpark_задания>папкуscriptsи загрузите в нее файлjob_save_table.py. -
Создайте задание с параметрами:
- Тип задания — PySpark.
- Main python файл –
s3a://<бакет_для_исходного_кода_PySpark_задания>/scripts/job_save_table.py. - Аргументы —
s3a://<бакет_для_выходных_данных_PySpark_задания>/warehouse
Проверьте результат
- Перейдите на страницу каталога
и выберите сервис Managed Service for Apache Spark. - Нажмите на имя нужного кластера и выберите вкладку Задания.
- Дождитесь, когда созданное PySpark-задание перейдет в статус Done.
- Убедитесь, что в бакете
<бакет_для_выходных_данных_PySpark_задания>в папкеwarehouseпоявилась БДdatabase_1. Теперь данные из созданной БД хранятся в бакете Object Storage в формате JSON.
Удалите созданные ресурсы
Некоторые ресурсы платные. Удалите ресурсы, которые вы больше не будете использовать, чтобы не платить за них: