Launching and managing applications for Spark and PySpark
There are multiple ways to run a Spark or PySpark job in a Yandex Data Processing cluster:
- Spark Shell (a command shell for Scala and Python programming languages). This method runs calculations line by line, rather than using a script. For more information about Spark Shell, see the Spark documentation
. - spark-submit script. Saves the calculation results to HDFS. For more information about
spark-submit
, see the Spark documentation . - Yandex Cloud CLI commands. These allow you to save calculation results not only to HDFS but also to a Yandex Object Storage bucket.
Below is an example demonstrating the calculation of 2018 US air traffic statistics based on data from transtats.bts.govyc-mdb-examples
.
Getting started
Prepare the infrastructure:
-
Create a network named
data-proc-network
. Disable Create subnets when creating it. -
In
data-proc-network
, create a subnet with the following parameters:- Name:
data-proc-subnet-a
- Zone:
ru-central1-a
- CIDR:
192.168.1.0/24
- Name:
-
Create a NAT gateway and a routing table named
data-proc-route-table
indata-proc-network
. Associate the routing table with thedata-proc-subnet-a
subnet. -
In the
data-proc-network
subnet, create a security group nameddata-proc-security-group
with the following rules:-
One rule for inbound and another one for outbound service traffic:
- Port range:
0-65535
- Protocol:
Any
- Source/Destination name:
Security group
- Security group:
Current
- Port range:
-
Rule for incoming traffic (for online access to subcluster hosts):
- Port range:
22
- Protocol:
TCP
- Source:
CIDR
- CIDR blocks:
0.0.0.0/0
- Port range:
-
Rule for outgoing HTTPS traffic:
- Port range:
443
- Protocol:
TCP
- Destination name:
CIDR
- CIDR blocks:
0.0.0.0/0
- Port range:
-
Rule for outgoing HTTP traffic:
- Port range:
80
- Protocol:
TCP
- Destination name:
CIDR
- CIDR blocks:
0.0.0.0/0
- Port range:
-
-
Create a service account named
data-proc-sa
with the following roles: -
Create a Yandex Object Storage bucket named
data-proc-bucket
with restricted access. -
Grant the
data-proc-sa
service accountREAD and WRITE
permissions fordata-proc-bucket
. -
Create a Yandex Data Processing cluster in any suitable configuration with the following settings:
- Service account:
data-proc-sa
- Availability zone:
ru-central1-a
- Bucket name:
data-proc-bucket
- Network:
data-proc-network
- Security groups:
data-proc-security-group
- Public access for subclusters: Provided
- Service account:
-
If you do not have Terraform yet, install it.
-
Get the authentication credentials. You can add them to environment variables or specify them later in the provider configuration file.
-
Configure and initialize a provider. There is no need to create a provider configuration file manually, you can download it
. -
Place the configuration file in a separate working directory and specify the parameter values. If you did not add the authentication credentials to environment variables, specify them in the configuration file.
-
Download the data-proc-for-spark-jobs.tf
configuration file to the same working directory.This file describes:
- Network.
- Subnet.
- NAT gateway and route table.
- Security groups.
- Service account to work with cloud resources.
- Bucket to store job dependencies and results.
- Yandex Data Processing cluster.
-
In the
data-proc-for-spark-jobs.tf
configuration file, specify all the required parameters. -
Make sure the Terraform configuration files are correct using this command:
terraform validate
If there are any errors in the configuration files, Terraform will point them out.
-
Create the required infrastructure:
-
Run the command to view planned changes:
terraform plan
If the resource configuration descriptions are correct, the terminal will display a list of the resources to modify and their parameters. This is a test step. No resources are updated.
-
If you are happy with the planned changes, apply them:
-
Run the command:
terraform apply
-
Confirm the update of resources.
-
Wait for the operation to complete.
-
All the required resources will be created in the specified folder. You can check resource availability and their settings in the management console
. -
Using Spark Shell
-
Use SSH to connect to the Yandex Data Processing cluster's master host.
-
Run Spark Shell on the master host:
/usr/bin/pyspark
The number of cores and executors is only limited by your Yandex Data Processing configuration.
-
Enter the following code line by line:
sql = SQLContext(sc) df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01")
The last line reads the data from the public bucket containing the sample data set. After this line is executed, an organized data set
df
(DataFrame) containing the data read becomes available for the current session. -
To see the schema of the resulting DataFrame, run the command:
df.printSchema()
The terminal displays a list of columns with their types.
-
Calculate flight statistics by month and find the top ten cities by the number of departures:
-
Number of flights by month:
df.groupBy("Month").count().orderBy("Month").show()
-
Top ten cities by number of departures:
df.groupBy("OriginCityName").count().orderBy("count", ascending=False).show(10)
-
Using Spark Submit
Spark Submit allows you to run pre-written applications using the spark-submit
script. In this example, we will calculate the number of flights by month.
-
Use SSH to connect to the Yandex Data Processing cluster's master host.
-
On the master host, create a file named
month_stat.py
with the following code:import sys from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext def main(): conf = SparkConf().setAppName("Month Stat - Python") conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") sc = SparkContext(conf=conf) sql = SQLContext(sc) df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01") defaultFS = sc._jsc.hadoopConfiguration().get("fs.defaultFS") month_stat = df.groupBy("Month").count() month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat") if __name__ == "__main__": main()
-
Run the application:
/usr/bin/spark-submit month_stat.py
-
The result of running the application is exported to HDFS. You can list the resulting files using the command:
hdfs dfs -ls /tmp/month_stat
This example describes how to build and run an application using the Scala
To create and launch a Spark application:
-
Use SSH to connect to the Yandex Data Processing cluster's master host.
-
Install
sbt, the standard build utility for Scala. It comes bundled with the Scala programming language. -
Create a folder, such as
spark-app
. -
Add a file to the created folder with the
./src/main/scala/app.scala
path. -
Paste the following code to the
app.scala
file:package com.yandex.cloud.dataproc.scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext object Main { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Month Stat - Scala App") val sc = new SparkContext(conf) sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01") val month_stat = df.groupBy("Month").count() val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat") sc.stop() } }
-
Prepare the data for building your application:
-
To find out the version of Scala installed on your system, run the
scala -version
command. -
To find out your
spark-core
andspark-sql
versions, check the contents of the/usr/lib/spark/jars
folder:ls /usr/lib/spark/jars
The versions are specified in the names of JAR files. Example:
spark-core_2.12-3.0.3.jar spark-sql_2.12-3.0.3.jar
The version you need is
3.0.3
. -
In the
spark-app
folder, create a file namedbuild.sbt
with the following configuration:scalaVersion := "<scala_version>" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "<spark_core_version>" % "provided", "org.apache.spark" %% "spark-sql" % "<spark_sql_version>" % "provided" )
Example:
scalaVersion := "2.12.10" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "3.0.3" % "provided", "org.apache.spark" %% "spark-sql" % "3.0.3" % "provided" )
-
-
Compile and build your JAR file:
sbt compile && sbt package
-
Get the name of the JAR file you built:
ls ~/spark-app/target/scala-<scala_version>
The result:
spark-app_2.12-0.1.0-SNAPSHOT.jar
. -
Launch the resulting application:
/usr/bin/spark-submit --class com.yandex.cloud.dataproc.scala.Main target/scala-<scala_version>/<built_JAR_file_name>
Example:
/usr/bin/spark-submit --class com.yandex.cloud.dataproc.scala.Main target/scala-2.12/spark-app_2.12-0.1.0-SNAPSHOT.jar
-
The result of running the application is exported to HDFS. You can list the resulting files using the command:
hdfs dfs -ls /tmp/month_stat
Terminating the application
By default, the resources of the running application are managed by the YARN component. If you need to terminate or remove the application from the queue, use the yarn
utility:
-
List the applications:
yarn application -list
-
Terminate the application you no longer need:
yarn application -kill <app_ID>
For more information about YARN commands, see YARN Commands
Running jobs using the Yandex Cloud CLI
Jobs are run using the Yandex Cloud CLI through the Yandex Data Processing agent installed on the cluster master host. The agent gets job parameters through the Yandex Data Processing API.
The executable file and its dependencies must be located in a storage accessible to the Yandex Data Processing cluster service account. The executed application itself must have access to the storages in which the source data set and execution results are saved.
You can save the calculation results to HDFS on the Yandex Data Processing cluster or to the data-proc-bucket
bucket you specified when creating the cluster.
All service and debugging information will be saved to data-proc-bucket
. For each job, the Yandex Data Processing agent creates a separate folder with a path, such as dataproc/clusters/<cluster_ID>/jobs/<job_ID>
.
Note
You can view the job logs and search data in them using Yandex Cloud Logging. For more information, see Working with logs.
Below are the two application versions, one for Python and one for Scala.
Running a PySpark job
To run a PySpark job:
Install additional dependencies
On a local computer:
-
If you do not have the Yandex Cloud command line interface yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder using the
--folder-name
or--folder-id
parameter. -
Install and configure the S3cmd console client to work with Yandex Object Storage.
-
Install Python. Make sure the Python version matches the version available from the image. You can check the version under Runtime environment. For image version 2.0, use Python 3.8.10:
sudo apt update && sudo apt install python3.8
Prepare and run a PySpark job
-
Create a file named
job.py
with the following code:import sys from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext def main(): if len(sys.argv) != 3: print('Usage job.py <input_directory> <output_directory>') sys.exit(1) in_dir = sys.argv[1] out_dir = sys.argv[2] conf = SparkConf().setAppName('Month Stat - Python') sc = SparkContext(conf=conf) sql = SQLContext(sc) df = sql.read.parquet(in_dir) month_stat = df.groupBy('Month').count() job_id = dict(sc._conf.getAll())['spark.yarn.tags'].replace('dataproc_job_', '') if out_dir.startswith('s3a://'): month_stat.repartition(1).write.format('csv').save(out_dir + job_id) else: default_fs = sc._jsc.hadoopConfiguration().get('fs.defaultFS') month_stat.repartition(1).write.format('csv').save(default_fs + out_dir + job_id) if __name__ == '__main__': main()
-
To make sure that PySpark has access to your code, upload the
job.py
file to the Object Storage bucket that the Yandex Data Processing cluster service account has access to:s3cmd put ./job.py s3://data-proc-bucket/bin/
-
Run the job.
The run command varies depending on whether you want to save the job results to Object Storage or to HDFS.
Object StorageHDFS directoryyc dataproc job create-pyspark \ --cluster-id=<cluster_ID> \ --name=<job_name> \ --main-python-file-uri="s3a://data-proc-bucket/bin/job.py" \ --args="s3a://yc-mdb-examples/dataproc/example01/set01" \ --args="s3a://data-proc-bucket/jobs_results/"
In the command, specify the following:
--cluster-id
: Cluster ID. You can get it with a list of clusters in the folder.--name
: Arbitrary Spark job name.
A CSV file with the execution results is saved to
data-proc-bucket
.yc dataproc job create-pyspark \ --cluster-id=<cluster_ID> \ --name=<job_name> \ --main-python-file-uri="s3a://data-proc-bucket/bin/job.py" \ --args="s3a://yc-mdb-examples/dataproc/example01/set01" \ --args="tmp/jobs/"
In the command, specify the following:
--cluster-id
: Cluster ID. You can get it with a list of clusters in the folder.--name
: Arbitrary Spark job name.
A CSV file with the execution result is saved to the
/tmp/jobs/<job_ID>/
folder in HDFS. -
(Optional) View the job logs:
yc dataproc job log <job_ID> --cluster-id=<cluster_ID>
Running Spark jobs
To run a Spark job:
- Install additional dependencies.
- Build a Scala application.
- Upload the JAR file to Object Storage.
- Run the Spark job in the Yandex Data Processing cluster.
Install additional dependencies
-
If you do not have the Yandex Cloud command line interface yet, install and initialize it.
The folder specified in the CLI profile is used by default. You can specify a different folder using the
--folder-name
or--folder-id
parameter. -
Use SSH to connect to the Yandex Data Processing cluster's master host.
-
Install
sbt
, the standard build utility for Scala. It comes bundled with the Scala programming language. -
Install and configure the S3cmd console client to work with Yandex Object Storage.
Build a Scala application
To simplify dependency management, build the application to a single JAR file (fat JAR) using the sbt-assembly
-
Create a folder named
spark-app
with theproject
andsrc/main/scala
subdirectories. -
Create a file named
spark-app/project/plugins.sbt
that describes the connection of thesbt-assembly
plugin to assemble a single JAR file:addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "<sbt_assembly_version>")
You can check the version of the
sbt-assembly
plugin in its repository , under Releases. -
Run the
scala -version
command to learn the version of Scala installed on your system. -
Create a file named
spark-app/build.sbt
with a description of the dependencies and the strategy for merging them into a single JAR file. Specify the Scala version in thebuild.sbt
file:scalaVersion := "<Scala_version>" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.4.4", "org.apache.spark" %% "spark-sql" % "2.4.4", ) assembly / assemblyMergeStrategy := { case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last case PathList("javax", "inject", xs @ _*) => MergeStrategy.last case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last case PathList("javax", "activation", xs @ _*) => MergeStrategy.last case PathList("org", "apache", xs @ _*) => MergeStrategy.last case PathList("com", "google", xs @ _*) => MergeStrategy.last case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last case PathList("com", "codahale", xs @ _*) => MergeStrategy.last case PathList("com", "yammer", xs @ _*) => MergeStrategy.last case "about.html" => MergeStrategy.rename case "overview.html" => MergeStrategy.last case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last case "META-INF/mailcap" => MergeStrategy.last case "META-INF/mimetypes.default" => MergeStrategy.last case "plugin.properties" => MergeStrategy.last case "log4j.properties" => MergeStrategy.last case "git.properties" => MergeStrategy.last case x => val oldStrategy = (assembly / assemblyMergeStrategy).value oldStrategy(x) }
-
Create a file named
spark-app/src/main/scala/app.scala
with the application code:package com.yandex.cloud.dataproc.scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext object Main { def main(args: Array[String]) { if (args.length != 2){ //check the argument System.err.println("Usage spark-app.jar <input_directory> <output_directory>"); System.exit(-1); } val inDir = args(0); //URI to source data val outDir = args(1); //URI to the output directory val conf = new SparkConf().setAppName("Month Stat - Scala App") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.parquet(inDir) val monthStat = df.groupBy("Month").count() val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") //get the HDFS server endpoint val jobId = conf.get("spark.yarn.tags").replace("dataproc_job_", ""); //get the job ID if (outDir.toLowerCase().startsWith("s3a://")) { monthStat.repartition(1).write.format("csv").save(outDir + jobId) } else { monthStat.repartition(1).write.format("csv").save(defaultFS + "/" + outDir + jobId) } sc.stop() } }
-
Run the app build in the
spark-app
folder:sbt clean && sbt compile && sbt assembly
If you get the
Error looking up function 'stat'
errorIf you get the
java.lang.UnsatisfiedLinkError: Error looking up function 'stat': java: undefined symbol: stat
error and your master host OS is Ubuntu, run eachsbt
command with the-Dsbt.io.jdktimestamps=true
flag:sbt clean -Dsbt.io.jdktimestamps=true && \ sbt compile -Dsbt.io.jdktimestamps=true && \ sbt assembly -Dsbt.io.jdktimestamps=true
The file will be available at the following path: spark-app/target/scala-<Scala_version>/spark-app-assembly-0.1.0-SNAPSHOT.jar
.
Upload the JAR file to Object Storage
For Spark to have access to the assembled JAR file, upload the file to data-proc-bucket
. You can upload the file using s3cmd:
s3cmd put ~/spark-app/target/scala-<Scala_version>/spark-app-assembly-0.1.0-SNAPSHOT.jar s3://data-proc-bucket/bin/
The file is uploaded to s3://data-proc-bucket/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar
.
Run the Spark job in the Yandex Data Processing cluster
-
Disconnect from the cluster master host.
-
Run the job.
The run command varies depending on whether you want to save the job results to Object Storage or to HDFS.
Object StorageHDFS directoryyc dataproc job create-spark \ --cluster-id=<cluster_ID> \ --name=<job_name> \ --main-class="com.yandex.cloud.dataproc.scala.Main" \ --main-jar-file-uri="s3a://data-proc-bucket/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar" \ --args="s3a://yc-mdb-examples/dataproc/example01/set01" \ --args="s3a://data-proc-bucket/jobs_results/"
In the command, specify the following:
--cluster-id
: Cluster ID. You can get it with a list of clusters in the folder.--name
: Arbitrary Spark job name.
A CSV file with the execution results is saved to
data-proc-bucket
.yc dataproc job create-spark \ --cluster-id=<cluster_ID> \ --name=<job_name> \ --main-class="com.yandex.cloud.dataproc.scala.Main" \ --main-jar-file-uri="s3a://data-proc-bucket/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar" \ --args="s3a://yc-mdb-examples/dataproc/example01/set01" \ --args="tmp/jobs/"
In the command, specify the following:
--cluster-id
: Cluster ID. You can get it with a list of clusters in the folder.--name
: Arbitrary Spark job name.
A CSV file with the execution result is saved to the
/tmp/jobs/<job_ID>/
folder in HDFS.Example message saying that the job was run successfully:
done (1m2s) id: {your_job_id} cluster_id: {your_cluster_id} name: test02 status: DONE spark_job: args: - s3a://yc-mdb-examples/dataproc/example01/set01 - s3a://data-proc-bucket/jobs_results/ main_jar_file_uri: s3a://data-proc-bucket/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar main_class: com.yandex.cloud.dataproc.scala.Main
Delete the resources you created
Some resources are not free of charge. To avoid paying for them, delete the resources you no longer need:
- Delete the Yandex Data Processing cluster.
- If you reserved public static IP addresses for the clusters, release and delete them.
- Delete the subnet.
- Delete the route table.
- Delete the NAT gateway.
- Delete the network.
To delete the infrastructure created with Terraform:
-
In the terminal window, go to the directory containing the infrastructure plan.
-
Delete the
data-proc-for-spark-jobs.tf
configuration file. -
Make sure the Terraform configuration files are correct using this command:
terraform validate
If there are errors in the configuration files, Terraform will point them out.
-
Confirm updating the resources.
-
Run the command to view planned changes:
terraform plan
If the resource configuration descriptions are correct, the terminal will display a list of the resources to modify and their parameters. This is a test step. No resources are updated.
-
If you are happy with the planned changes, apply them:
-
Run the command:
terraform apply
-
Confirm the update of resources.
-
Wait for the operation to complete.
-
-
All the resources described in the configuration file will be deleted.