Yandex Cloud
Search
Contact UsGet started
  • Pricing
  • Customer Stories
  • Documentation
  • Blog
  • All Services
  • System Status
    • Featured
    • Infrastructure & Network
    • Data Platform
    • Containers
    • Developer tools
    • Serverless
    • Security
    • Monitoring & Resources
    • AI for business
    • Business tools
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Start testing with double trial credits
    • Cloud credits to scale your IT product
    • Gateway to Russia
    • Cloud for Startups
    • Center for Technologies and Society
    • Yandex Cloud Partner program
  • Pricing
  • Customer Stories
  • Documentation
  • Blog
© 2025 Direct Cursus Technology L.L.C.
Yandex Managed Service for Apache Spark™
  • Getting started
    • All tutorials
    • Automating operations using Yandex Managed Service for Apache Airflow™
    • Working with an Object Storage table from a PySpark job
    • Yandex Managed Service for Apache Spark™ integration with Apache Hive™ Metastore
    • Running a PySpark job using Yandex Managed Service for Apache Airflow™
    • Using Yandex Object Storage in Managed Service for Apache Spark™
  • Access management
  • Pricing policy
  • Yandex Monitoring metrics
  • Terraform reference
  • Release notes

In this article:

  • Required paid resources
  • Set up the infrastructure
  • Prepare and run a PySpark job
  • Check the result
  • Delete the resources you created
  1. Tutorials
  2. Working with an Object Storage table from a PySpark job

Working with an Object Storage table from a PySpark job using Apache Hive™ Metastore and Apache Iceberg™

Written by
Yandex Cloud
Updated at November 6, 2025
  • Required paid resources
  • Set up the infrastructure
  • Prepare and run a PySpark job
  • Check the result
  • Delete the resources you created

Integrating Yandex Managed Service for Apache Spark™ with Apache Hive™ Metastore and Apache Iceberg™ strongly empowers you in managing your data in Object Storage via SQL tables.

Apache Hive™ Metastore provides:

  • Centrally storing metadata on databases, tables, and partitions.
  • Readily accessing your data without manually specifying paths and schemas.
  • Conveniently storing table and column statistics for query optimization.

Apache Iceberg™ provides:

  • Versioning data and storing snapshots.
  • Running ACID transactions that support UPDATE, DELETE, and MERGE operations, as well as the evolution of tables and the partitioning method.
  • Ensuring scalability while maintaining high operational performance.

In this tutorial, you will learn how to use the following Apache Hive™ Metastore and Apache Iceberg™ features when working with S3 storage from a PySpark job:

  • Accessing a table by name.

    For all clusters, Apache Hive™ Metastore uses a global metadata catalog. The stored metadata can then be used by any app from any Apache Spark™ cluster connected to that Apache Hive™ Metastore cluster.

  • Creating and reading metadata snapshots.

    Apache Iceberg™ commits each write to the table as a new metadata snapshot. Then, you can access these snapshots by specifying a time point or snapshot ID.

To implement the above example, do the following:

  1. Set up your infrastructure.
  2. Prepare and run a PySpark job.
  3. Check the result.

If you no longer need the resources you created, delete them.

Required paid resourcesRequired paid resources

The support cost for this solution includes:

  • Object Storage bucket fee: Covers data storage and bucket operations (see Object Storage pricing).
  • Fee for collecting and storing logs (see Cloud Logging pricing).
  • Fee for the computing resources of Yandex Managed Service for Apache Spark™ cluster components (see Yandex Managed Service for Apache Spark™ pricing).
  • Fee for the computing resources of Apache Hive™ Metastore cluster components (see Yandex MetaData Hub pricing).

Set up the infrastructureSet up the infrastructure

Set up the infrastructure:

  1. Create a service account named spark-agent and assign it the managed-spark.integrationProvider role.

  2. Create a service account named metastore-agent and assign it the managed-metastore.integrationProvider role to enable your Apache Hive™ Metastore cluster to interact with other resources.

  3. Create buckets:

    • One for the PySpark job source code.
    • One for output data.
  4. Grant permissions to the spark-agent service account for the created buckets:

    • Bucket for the PySpark job source code: READ.
    • Bucket for output data: READ and WRITE.
  5. Grant metastore-agent the READ and WRITE permissions for the output bucket.

  6. Create a cloud network named integration-network.

    This will automatically create three subnets in different availability zones.

  7. For the Apache Spark™ cluster, create a security group named spark-sg in integration-network. Add the following rule to it:

    • For outgoing traffic, to allow Apache Spark™ cluster connections to Apache Hive™ Metastore:

      • Port range: 9083
      • Protocol: Any
      • Destination: CIDR
      • CIDR blocks: 0.0.0.0/0
  8. For the Apache Hive™ Metastore cluster, create a security group named metastore-sg in integration-network. Add the following rules to it:

    • For incoming client traffic:

      • Port range: 30000-32767
      • Protocol: Any
      • Source: CIDR
      • CIDR blocks: 0.0.0.0/0
    • For incoming load balancer traffic:

      • Port range: 10256
      • Protocol: Any
      • Source: Load balancer health checks
  9. Create a Apache Hive™ Metastore cluster with the following parameters:

    • Service account: metastore-agent.
    • Network: integration-network.
    • Subnet: integration-network-ru-central1-a.
    • Security groups: metastore-sg.
  10. Create a Apache Spark™ cluster with the following parameters:

    • Service account: spark-agent.
    • Network: integration-network.
    • Subnet: integration-network-ru-central1-a.
    • Security groups: spark-sg.
    • Metastore: Apache Hive™ Metastore cluster you created earlier.

Prepare and run a PySpark jobPrepare and run a PySpark job

For a PySpark job, we will use a Python script that:

  1. Creates a database and table in Apache Iceberg™ format in the bucket.
  2. Writes 10 rows of data to the table.
  3. Stores the ID of the current table snapshot.
  4. Writes 10 more rows of data to the table.
  5. Displays the number of rows in the current table state.
  6. Displays the number of rows in the table state at the time of the snapshot.

Prepare a script file:

  1. Create the ice_min_demo.py file and paste the following code to it:

    ice_min_demo.py
    import random
    from pyspark.sql import SparkSession
    
    spark = (
       SparkSession.builder
       .appName("ice_min_demo")
       .enableHiveSupport()
       .getOrCreate()
    )
    
    # Creating a database and table in Apache Iceberg™ format
    # Apache Hive™ Metastore captures metadata, allowing you to access the table by the `db.tbl` name
    # from any Spark apps associated with this Apache Hive™ Metastore cluster.
    db, tbl = "demo_db", "demo_events"
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {db}.{tbl} (
       id BIGINT,
       value DOUBLE
    ) USING iceberg
    """)
    
    # Writing the first piece of data to the table
    df1 = spark.createDataFrame([(i, random.random()) for i in range(10)], ["id","value"])
    df1.writeTo(f"{db}.{tbl}").append()
    
    # Fetching the ID of the current snapshot from the `.snapshots` housekeeping table
    snap_before = spark.sql(f"SELECT max(snapshot_id) AS s FROM {db}.{tbl}.snapshots").collect()[0][0]
    
    # Writing the second piece of data to the table
    df2 = spark.createDataFrame([(i, random.random()) for i in range(10, 20)], ["id","value"])
    df2.writeTo(f"{db}.{tbl}").append()
    
    # Counting and displaying the number of rows in the table's current (20) and previous (10) state
    cnt_now = spark.table(f"{db}.{tbl}").count()
    cnt_past = spark.sql(f"SELECT COUNT(*) FROM {db}.{tbl} VERSION AS OF {snap_before}").collect()[0][0]
    print(f"now_count: {cnt_now} | past_count: {cnt_past}", flush=True)
    
    spark.stop()
    
  2. In the source code bucket, create a folder named scripts and upload the ice_min_demo.py file to this folder.

  3. Create a job with the following settings:

    • Job type: PySpark.
    • Main python file: s3a://<source_code_bucket>/scripts/ice_min_demo.py.
    • Arguments: spark.sql.warehouse.dir – s3a://<output_data_bucket>/warehouse/.

Check the resultCheck the result

  1. Wait for the PySpark job you created to change its status to Done.
  2. Open the job execution logs.
  3. In the logs, navigate to this string: now_count: 20 | past_count: 10.
  4. Make sure the warehouse/demo_db folder appears in your output data bucket. The data from the new database, demo_db, is now stored in the Object Storage bucket, and the DB metadata, in the Apache Hive™ Metastore cluster.

Delete the resources you createdDelete the resources you created

Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them:

  1. Object Storage buckets. Before deleting your buckets, make sure to have deleted all objects from those buckets.
  2. Apache Hive™ Metastore cluster.
  3. Apache Spark™ cluster.

Was the article helpful?

Previous
Automating operations using Yandex Managed Service for Apache Airflow™
Next
Yandex Managed Service for Apache Spark™ integration with Apache Hive™ Metastore
© 2025 Direct Cursus Technology L.L.C.