Working with Yandex Object Storage from a PySpark job using a local Hive catalog
In Yandex Object Storage buckets, you can store both files required to run jobs in a Yandex Managed Service for Apache Spark™ cluster and the results of those jobs.
Yandex Managed Service for Apache Spark™ clusters are pre-configured to work with S3 storages. To use an Object Storage bucket in a PySpark job:
- Grant the Yandex Managed Service for Apache Spark™ cluster service account permission to access the Object Storage bucket.
- In the job's arguments, specify the path to the folder in the Object Storage bucket.
This guide gives an example of how to use a table in an Object Storage bucket from a PySpark job using a built-in local Hive catalog.
To implement the above example:
If you no longer need the resources you created, delete them.
Note
With a local Hive catalog, you can access tables by name without specifying a path and use basic Hive features without connecting a Apache Hive™ Metastore cluster. The metadata stored in the local folder remains inaccessible to other clusters. For an example of using a global folder, see Working with Yandex Object Storage from a PySpark job using an Apache Hive™ Metastore cluster.
Required paid resources
The support cost for this solution includes:
- Object Storage bucket fee for using storage and performing data operations (see Object Storage pricing).
- Cloud Logging fee for the amount of data written and the time of its retention (see Cloud Logging pricing).
Set up your infrastructure
-
Create a service account named
spark-agentfor the Yandex Managed Service for Apache Spark™ cluster and assign it the managed-spark.integrationProvider role to enable the Yandex Managed Service for Apache Spark™ cluster to interact with other resources. -
<bucket_for_PySpark_job_source_code>.<bucket_for_PySpark_job_output_data>.
-
Grant permissions to the
spark-agentservice account for the created buckets:<bucket_for_PySpark_job_source_code>:READpermission.<bucket_for_PySpark_job_output_data>:READ and WRITEpermission.
-
Create a cloud network named
spark-network.This will automatically create three subnets in different availability zones.
-
Create a Yandex Managed Service for Apache Spark™ cluster with the following parameters:
- Service account:
spark-agent - Network:
spark-network - Subnet:
spark-network-ru-central1-a
- Service account:
Prepare a PySpark job
For a PySpark job, we use a Python script that creates a table named table_1 in database_1. The PySpark job's arguments give the path to the folder the database will be created in. To connect the built-in Hive catalog, your script should specify this Spark session configuration parameter: spark.sql.catalogImplementation=hive. The script will be stored in the Object Storage bucket.
Prepare a script file:
-
Create a local file named
job_save_table.pyand paste the following script to it:job_save_table.py
import random import sys from pyspark.sql import SparkSession def prepare_table(spark, database, table): create_database_sql = "create database if not exists {database}" create_table_sql = """ create table if not exists {database}.{table} ( id int, value double ) """ truncate_table_sql = "truncate table {database}.{table}" spark.sql(create_database_sql.format(database=database)) spark.sql(create_table_sql.format(database=database, table=table)) spark.sql(truncate_table_sql.format(database=database, table=table)) def write_data(spark, database, table): data = [(i, random.random()) for i in range(100_000)] # Creating a dataframe df = spark.createDataFrame(data, schema=['id', 'value']) table_full_name = "{database}.{table}".format(database=database, table=table) df.write.mode('overwrite').format('json').saveAsTable(table_full_name) def main(): # Creating a Spark session spark = ( SparkSession .builder .appName('job_save_table') .config('spark.executor.instances', 1) .config('spark.sql.warehouse.dir', sys.argv[1]) .config('spark.sql.catalogImplementation', 'hive') .getOrCreate() ) database, table = 'database_1', 'table_1' prepare_table(spark, database, table) write_data(spark, database, table) if __name__ == '__main__': if len(sys.argv) != 2: print("Usage: job-save-table s3a://<bucket>/<folder>", file=sys.stderr) sys.exit(-1) main() -
In
<bucket_for_PySpark_job_source_code>, create a folder namedscriptsand upload thejob_save_table.pyfile to it. -
Create a job with the following settings:
- Job type: PySpark
- Main python file:
s3a://<bucket_for_PySpark_job_source_code>/scripts/job_save_table.py - Arguments:
s3a://<bucket_for_PySpark_job_output_data>/warehouse
Check the result
- Navigate to the folder dashboard
and select Managed Service for Apache Spark™. - Click the name of your cluster and select the Jobs tab.
- Wait for the PySpark job you created to change its status to Done.
- Make sure the
warehousefolder in<bucket_for_PySpark_job_output_data>now containsdatabase_1. The data from the new DB is now stored in the Object Storage bucket in JSON format.
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them: