Using Yandex Object Storage in Managed Service for Apache Spark™
In Yandex Object Storage buckets, you can store both files required to run jobs in a Yandex Managed Service for Apache Spark™ cluster and the results of those jobs.
To use Object Storage in Managed Service for Apache Spark™:
If you no longer need the resources you created, delete them.
Required paid resources
The support cost for this solution includes:
- Object Storage bucket fee for using storage and performing data operations (see Object Storage pricing).
- Cloud Logging fee for the amount of data written and the time of its retention (see Cloud Logging pricing).
Set up the infrastructure
-
Create a service account named
spark-agentfor the Apache Spark™ cluster and assign it the managed-spark.integrationProvider role to enable the Apache Spark™ cluster to interact with other resources. -
<bucket_for_PySpark_job_source_code>.<bucket_for_PySpark_job_output_data>.
-
Grant permissions to the
spark-agentservice account for the created buckets:<bucket_for_PySpark_job_source_code>:READpermission.<bucket_for_PySpark_job_output_data>:READ and WRITEpermission.
-
Create a cloud network named
spark-network.This will automatically create three subnets in different availability zones.
-
Create a Managed Service for Apache Spark™ cluster with the following parameters:
- Service account:
spark-agent - Network:
spark-network - Subnet:
spark-network-ru-central1-a
- Service account:
Prepare a PySpark job
For a PySpark job, we will use a Python script that is stored in the Object Storage bucket and creates a table named table_1 in database_1. Prepare a script file:
-
Create a local file named
job_save_table.pyand paste the following script to it:job_save_table.py
import random import sys from pyspark.sql import SparkSession def prepare_table(spark, database, table): create_database_sql = "create database if not exists {database}" create_table_sql = """ create table if not exists {database}.{table} ( id int, value double ) """ truncate_table_sql = "truncate table {database}.{table}" spark.sql(create_database_sql.format(database=database)) spark.sql(create_table_sql.format(database=database, table=table)) spark.sql(truncate_table_sql.format(database=database, table=table)) def write_data(spark, database, table): data = [(i, random.random()) for i in range(100_000)] # Creating a dataframe df = spark.createDataFrame(data, schema=['id', 'value']) table_full_name = "{database}.{table}".format(database=database, table=table) df.write.mode('overwrite').format('json').saveAsTable(table_full_name) def main(): # Creating a Spark session spark = ( SparkSession .builder .appName('job_save_table') .config('spark.executor.instances', 1) .config('spark.sql.warehouse.dir', sys.argv[1]) .config('spark.sql.catalogImplementation', 'hive') .getOrCreate() ) database, table = 'database_1', 'table_1' prepare_table(spark, database, table) write_data(spark, database, table) if __name__ == '__main__': if len(sys.argv) != 2: print("Usage: job-save-table s3a://<bucket>/<folder>", file=sys.stderr) sys.exit(-1) main() -
In
<bucket_for_PySpark_job_source_code>, create a folder namedscriptsand upload thejob_save_table.pyfile to it. -
Create a job with the following settings:
- Job type: PySpark
- Main python file:
s3a://<bucket_for_PySpark_job_source_code>/scripts/job_save_table.py - Arguments:
s3a://<bucket_for_PySpark_job_output_data>/warehouse
Check the result
- Navigate to the folder dashboard
and select Managed Service for Apache Spark™. - Click the name of your cluster and open the Jobs tab.
- Wait for the PySpark job you created to change its status to Done.
- Make sure the
warehousefolder in<bucket_for_PySpark_job_output_data>now containsdatabase_1. The data from the new DB is now stored in the Object Storage bucket in JSON format.
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them: