Yandex Managed Service for Apache Spark™ integration with Apache Hive™ Metastore
You can connect an Apache Hive™ Metastore cluster to an Apache Spark™ cluster. In this case, metadata produced by job runs will get uploaded to the Apache Hive™ Metastore cluster. The saved metadata can be used by another Apache Spark™ cluster.
Below, we walk you through an example of using a PySpark job to create a database and a table within it and then load the data from the new database into a Yandex Object Storage bucket. Database metadata is stored in an Apache Hive™ Metastore cluster connected to an Apache Spark™ cluster.
To implement the above example:
If you no longer need the resources you created, delete them.
Required paid resources
The support cost for this solution includes:
- Object Storage bucket fee for storage and data operations (see Object Storage pricing).
- Yandex Cloud Logging fee for the amount of data written and the time of its retention (see Cloud Logging pricing).
- Fee for the computing resources of Managed Service for Apache Spark™ cluster components (see Managed Service for Apache Spark™ pricing).
- Fee for the computing resources of Apache Hive™ Metastore cluster components (see Yandex MetaData Hub pricing).
Set up the infrastructure
-
Create a service account named
spark-agentfor the Apache Spark™ cluster and assign it the managed-spark.integrationProvider role to enable the Apache Spark™ cluster to interact with other resources. -
Create a service account named
metastore-agentand assign it the managed-metastore.integrationProvider and storage.uploader roles to enable your Apache Hive™ Metastore cluster to interact with other resources and export metadata to the Object Storage bucket. -
- One for the PySpark job source code.
- One for output data.
-
Grant permissions to the
spark-agentservice account for the created buckets:- Bucket for the PySpark job source code:
READ. - Bucket for output data:
READ and WRITE.
- Bucket for the PySpark job source code:
-
Grant the
READ and WRITEpermissions for the output bucket to themetastore-agentservice account. -
Create a cloud network named
integration-network.This will automatically create three subnets in different availability zones.
-
For the Apache Spark™ cluster, create a security group named
spark-sginintegration-network. Add the following rule to it:-
For outgoing traffic, to allow Apache Spark™ cluster connections to Apache Hive™ Metastore:
- Port range:
9083 - Protocol:
Any - Destination:
CIDR - CIDR blocks:
0.0.0.0/0
- Port range:
-
-
For the Apache Hive™ Metastore cluster, create a security group named
metastore-sginintegration-network. Add the following rules to it:-
For incoming client traffic:
- Port range:
30000-32767 - Protocol:
Any - Source:
CIDR - CIDR blocks:
0.0.0.0/0
- Port range:
-
For incoming load balancer traffic:
- Port range:
10256 - Protocol:
Any - Source:
Load balancer health checks
- Port range:
-
-
Create a Apache Hive™ Metastore cluster with the following parameters:
- Service account:
metastore-agent. - Network:
integration-network. - Subnet:
integration-network-ru-central1-a. - Security groups:
metastore-sg.
- Service account:
-
Create a Managed Service for Apache Spark™ cluster with the following parameters:
- Service account:
spark-agent. - Network:
integration-network. - Subnet:
integration-network-ru-central1-a. - Security groups:
spark-sg. - Metastore: Apache Hive™ Metastore cluster you created earlier.
- Service account:
Prepare a PySpark job
For a PySpark job, we will use a Python script that creates a database named database_1 and a table named table_1. The script will be stored in the Object Storage bucket.
Prepare a script file:
-
Create a local file named
job-create-table.pyand paste the following script to it:job-create-table.py
import random import sys from pyspark.sql import SparkSession def prepare_table(spark, database, table): create_database_sql = "create database if not exists {database}" create_table_sql = """ create table if not exists {database}.{table} ( id int, value double ) """ truncate_table_sql = "truncate table {database}.{table}" spark.sql(create_database_sql.format(database=database)) spark.sql(create_table_sql.format(database=database, table=table)) spark.sql(truncate_table_sql.format(database=database, table=table)) def write_data(spark, database, table): data = [(i, random.random()) for i in range(100_000)] # Creating a dataframe df = spark.createDataFrame(data, schema=['id', 'value']) table_full_name = "{database}.{table}".format(database=database, table=table) df.write.mode('overwrite').format('json').saveAsTable(table_full_name) def main(): # Creating a Spark session spark = ( SparkSession .builder .appName('job-create-table') .enableHiveSupport() .config('spark.sql.warehouse.dir', sys.argv[1]) .getOrCreate() ) database, table = 'database_1', 'table_1' prepare_table(spark, database, table) write_data(spark, database, table) if __name__ == '__main__': if len(sys.argv) != 2: print("Usage: job-create-table s3a://<bucket>/<folder>", file=sys.stderr) sys.exit(-1) main() -
In the source code bucket, create a folder named
scriptsand upload thejob-create-table.pyfile to this folder. -
In the output bucket, create the
warehousefolder to load data fromdatabase_1to. -
Create a job with the following settings:
- Job type: PySpark
- Main python file:
s3a://<source_code_bucket>/scripts/job-create-table.py - Arguments:
s3a://<bucket_for_output_data>/warehouse
Check the result
-
Navigate to the folder dashboard
and select Managed Service for Apache Spark™. -
Click the name of your cluster and open the Jobs tab.
-
Wait for the PySpark job you created to change its status to Done.
-
Make sure the file with data from
database_1appears in thewarehousefolder in your output data bucket. -
Make sure the Apache Hive™ Metastore cluster has the metadata on
database_1:- Export the metadata from the Apache Hive™ Metastore cluster to the output bucket.
- Download the metadata file and make sure it mentions
database_1.
Delete the resources you created
Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them:
- Apache Hive™ Metastore cluster.
- Apache Spark™ cluster.
- Object Storage buckets. Before deleting your buckets, make sure to have deleted all objects from those buckets.