Working with Spark jobs
Apache Spark
In this section, we provide a simple example that demonstrates how to use the Spark interface for Scala and Java in Yandex Data Processing. In the example, we use Spark to count the number of times each word is seen in a short text.
Getting started
-
Create a service account with the
dataproc.agent
anddataproc.provisioner
roles. -
In Object Storage, create buckets and configure access to them:
- Create a bucket for the input data and grant the
READ
permission for this bucket to the cluster service account. - Create a bucket for the processing output and grant the cluster service account
READ and WRITE
permissions for this bucket.
- Create a bucket for the input data and grant the
-
Create a Yandex Data Processing cluster with the following settings:
- Services:
HDFS
SPARK
YARN
- Service account: Select the service account you previously created.
- Bucket name: Select a bucket to hold the processing results.
- Services:
Create a Spark job
-
Upload a file for processing:
-
Copy and save the following to a file named
text.txt
:text.txt
she sells sea shells on the sea shore the shells that she sells are sea shells I am sure so if she sells sea shells on the sea shore I am sure that the shells are sea shore shells
-
Upload the
text.txt
file to the source data bucket.
-
-
Download the
spark-app_2.11-0.1.0-SNAPSHOT.jar
file containing the Scala code of the word_count.scala analysis program and upload it to the input data bucket:word_count.scala
package com.yandex.cloud.dataproc.scala import org.apache.spark.{SparkConf, SparkContext} object Main { def main(args: Array[String]) { if (args.length != 2){ // check number of args System.err.println("Usage spark-app.jar <input_directory> <output_directory>"); System.exit(-1); } val inDir = args(0); //input URI val outDir = args(1); //output URI val conf = new SparkConf().setAppName("Word count - Scala App") val sc = new SparkContext(conf) val text_file = sc.textFile(inDir) val counts = text_file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") if (outDir.toLowerCase().startsWith("s3a://")) { counts.saveAsTextFile(outDir) } else { counts.saveAsTextFile(defaultFS + "/" + outDir) } sc.stop() } }
For more information about building an application written in Scala for Spark, see Using Spark Submit.
-
Create a Spark job with the following parameters:
- Main jar:
s3a://<input_data_bucket_name>/spark-app_2.11-0.1.0-SNAPSHOT.jar
- Main class:
com.yandex.cloud.dataproc.scala.Main
- Arguments:
s3a://<input_data_bucket_name>/text.txt
s3a://<output_bucket_name>/<output_directory>
- Main jar:
-
Wait for the job status to change to
Done
. -
Download from the bucket and review the files with the results from the bucket:
part-00000
(are,2) (am,2) (she,3) (so,1)
part-00001
(shore,3) (if,1) (that,2) (on,2) (shells,6) (I,2) (sure,2) (sea,6) (the,4) (sells,3)
Note
You can view the job logs and search data in them using Yandex Cloud Logging. For more information, see Working with logs.
Delete the resources you created
Some resources are not free of charge. To avoid paying for them, delete the resources you no longer need: