Working with an Object Storage table from a PySpark job using Apache Hive™ Metastore and Apache Iceberg™
Integrating Yandex Managed Service for Apache Spark™ with Apache Hive™ Metastore and Apache Iceberg™ strongly empowers you in managing your data in Object Storage via SQL tables.
Apache Hive™ Metastore provides:
- Centrally storing metadata on databases, tables, and partitions.
- Readily accessing your data without manually specifying paths and schemas.
- Conveniently storing table and column statistics for query optimization.
Apache Iceberg™ provides:
- Versioning data and storing snapshots.
- Running ACID transactions that support
UPDATE,DELETE, andMERGEoperations, as well as the evolution of tables and the partitioning method. - Ensuring scalability while maintaining high operational performance.
In this tutorial, you will learn how to use the following Apache Hive™ Metastore and Apache Iceberg™ features when working with S3 storage from a PySpark job:
-
Accessing a table by name.
For all clusters, Apache Hive™ Metastore uses a global metadata catalog. The stored metadata can then be used by any app from any Apache Spark™ cluster connected to that Apache Hive™ Metastore cluster.
-
Creating and reading metadata snapshots.
Apache Iceberg™ commits each write to the table as a new metadata snapshot. Then, you can access these snapshots by specifying a time point or snapshot ID.
To implement the above example, do the following:
If you no longer need the resources you created, delete them.
Required paid resources
The support cost for this solution includes:
- Object Storage bucket fee: Covers data storage and bucket operations (see Object Storage pricing).
- Fee for collecting and storing logs (see Cloud Logging pricing).
- Fee for the computing resources of Yandex Managed Service for Apache Spark™ cluster components (see Yandex Managed Service for Apache Spark™ pricing).
- Fee for the computing resources of Apache Hive™ Metastore cluster components (see Yandex MetaData Hub pricing).
Set up the infrastructure
Set up the infrastructure:
-
Create a service account named
spark-agentand assign it the managed-spark.integrationProvider role. -
Create a service account named
metastore-agentand assign it the managed-metastore.integrationProvider role to enable your Apache Hive™ Metastore cluster to interact with other resources. -
- One for the PySpark job source code.
- One for output data.
-
Grant permissions to the
spark-agentservice account for the created buckets:- Bucket for the PySpark job source code:
READ. - Bucket for output data:
READ and WRITE.
- Bucket for the PySpark job source code:
-
Grant
metastore-agenttheREAD and WRITEpermissions for the output bucket. -
Create a cloud network named
integration-network.This will automatically create three subnets in different availability zones.
-
For the Apache Spark™ cluster, create a security group named
spark-sginintegration-network. Add the following rule to it:-
For outgoing traffic, to allow Apache Spark™ cluster connections to Apache Hive™ Metastore:
- Port range:
9083 - Protocol:
Any - Destination:
CIDR - CIDR blocks:
0.0.0.0/0
- Port range:
-
-
For the Apache Hive™ Metastore cluster, create a security group named
metastore-sginintegration-network. Add the following rules to it:-
For incoming client traffic:
- Port range:
30000-32767 - Protocol:
Any - Source:
CIDR - CIDR blocks:
0.0.0.0/0
- Port range:
-
For incoming load balancer traffic:
- Port range:
10256 - Protocol:
Any - Source:
Load balancer health checks
- Port range:
-
-
Create a Apache Hive™ Metastore cluster with the following parameters:
- Service account:
metastore-agent. - Network:
integration-network. - Subnet:
integration-network-ru-central1-a. - Security groups:
metastore-sg.
- Service account:
-
Create a Apache Spark™ cluster with the following parameters:
- Service account:
spark-agent. - Network:
integration-network. - Subnet:
integration-network-ru-central1-a. - Security groups:
spark-sg. - Metastore: Apache Hive™ Metastore cluster you created earlier.
- Service account:
Prepare and run a PySpark job
For a PySpark job, we will use a Python script that:
- Creates a database and table in Apache Iceberg™ format in the bucket.
- Writes 10 rows of data to the table.
- Stores the ID of the current table snapshot.
- Writes 10 more rows of data to the table.
- Displays the number of rows in the current table state.
- Displays the number of rows in the table state at the time of the snapshot.
Prepare a script file:
-
Create the
ice_min_demo.pyfile and paste the following code to it:ice_min_demo.py
import random from pyspark.sql import SparkSession spark = ( SparkSession.builder .appName("ice_min_demo") .enableHiveSupport() .getOrCreate() ) # Creating a database and table in Apache Iceberg™ format # Apache Hive™ Metastore captures metadata, allowing you to access the table by the `db.tbl` name # from any Spark apps associated with this Apache Hive™ Metastore cluster. db, tbl = "demo_db", "demo_events" spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}") spark.sql(f""" CREATE TABLE IF NOT EXISTS {db}.{tbl} ( id BIGINT, value DOUBLE ) USING iceberg """) # Writing the first piece of data to the table df1 = spark.createDataFrame([(i, random.random()) for i in range(10)], ["id","value"]) df1.writeTo(f"{db}.{tbl}").append() # Fetching the ID of the current snapshot from the `.snapshots` housekeeping table snap_before = spark.sql(f"SELECT max(snapshot_id) AS s FROM {db}.{tbl}.snapshots").collect()[0][0] # Writing the second piece of data to the table df2 = spark.createDataFrame([(i, random.random()) for i in range(10, 20)], ["id","value"]) df2.writeTo(f"{db}.{tbl}").append() # Counting and displaying the number of rows in the table's current (20) and previous (10) state cnt_now = spark.table(f"{db}.{tbl}").count() cnt_past = spark.sql(f"SELECT COUNT(*) FROM {db}.{tbl} VERSION AS OF {snap_before}").collect()[0][0] print(f"now_count: {cnt_now} | past_count: {cnt_past}", flush=True) spark.stop() -
In the source code bucket, create a folder named
scriptsand upload theice_min_demo.pyfile to this folder. -
Create a job with the following settings:
- Job type: PySpark.
- Main python file:
s3a://<source_code_bucket>/scripts/ice_min_demo.py. - Arguments:
spark.sql.warehouse.dir–s3a://<output_data_bucket>/warehouse/.
Check the result
- Wait for the PySpark job you created to change its status to Done.
- Open the job execution logs.
- In the logs, navigate to this string:
now_count: 20 | past_count: 10. - Make sure the
warehouse/demo_dbfolder appears in your output data bucket. The data from the new database,demo_db, is now stored in the Object Storage bucket, and the DB metadata, in the Apache Hive™ Metastore cluster.
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them:
- Object Storage buckets. Before deleting your buckets, make sure to have deleted all objects from those buckets.
- Apache Hive™ Metastore cluster.
- Apache Spark™ cluster.