Работа с заданиями Spark
Apache Spark
В этой статье на простом примере показывается, как в Yandex Data Processing использовать интерфейс Spark для языков Scala и Java. При помощи Spark в приведенном примере подсчитывается число случаев употребления каждого из слов, которые встречаются в коротком образце текста.
Перед началом работы
-
Создайте сервисный аккаунт с ролями
dataproc.agent
иdataproc.provisioner
. -
В Object Storage создайте бакеты и настройте доступ к ним:
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
READ
для этого бакета. - Создайте бакет для результатов обработки и предоставьте сервисному аккаунту кластера разрешение
READ и WRITE
для этого бакета.
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
-
Создайте кластер Yandex Data Processing со следующими настройками:
- Сервисы:
HDFS
SPARK
YARN
- Сервисный аккаунт: выберите созданный ранее сервисный аккаунт.
- Имя бакета: выберите бакет для результатов обработки.
- Сервисы:
Создайте задание Spark
-
Загрузите файл для обработки:
-
Скопируйте и сохраните в файле
text.txt
:text.txt
she sells sea shells on the sea shore the shells that she sells are sea shells I am sure so if she sells sea shells on the sea shore I am sure that the shells are sea shore shells
-
Загрузите файл
text.txt
в бакет для исходных данных.
-
-
Скачайте и загрузите в бакет для исходных данных jar-файл
spark-app_2.11-0.1.0-SNAPSHOT.jar
, собранный из исходного текста программы анализа word_count.scala на языке Scala:word_count.scala
package com.yandex.cloud.dataproc.scala import org.apache.spark.{SparkConf, SparkContext} object Main { def main(args: Array[String]) { if (args.length != 2){ // check number of args System.err.println("Usage spark-app.jar <входная_директория> <выходная_директория>"); System.exit(-1); } val inDir = args(0); //input URI val outDir = args(1); //output URI val conf = new SparkConf().setAppName("Word count - Scala App") val sc = new SparkContext(conf) val text_file = sc.textFile(inDir) val counts = text_file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") if (outDir.toLowerCase().startsWith("s3a://")) { counts.saveAsTextFile(outDir) } else { counts.saveAsTextFile(defaultFS + "/" + outDir) } sc.stop() } }
Подробнее о том, как собрать приложение для Spark, написанное на языке Scala, см. в разделе Использование Spark Submit.
-
Создайте задание Spark с параметрами:
- Основной JAR файл:
s3a://<имя_бакета_для_исходных_данных>/spark-app_2.11-0.1.0-SNAPSHOT.jar
- Основной класс:
com.yandex.cloud.dataproc.scala.Main
- Аргументы:
s3a://<имя_бакета_для_исходных_данных>/text.txt
s3a://<имя_бакета_для_результатов_обработки>/<папка_для_результатов>
- Основной JAR файл:
-
Подождите, пока статус задания изменится на
Done
. -
Скачайте из бакета и просмотрите файлы с результатами обработки:
part-00000
(are,2) (am,2) (she,3) (so,1)
part-00001
(shore,3) (if,1) (that,2) (on,2) (shells,6) (I,2) (sure,2) (sea,6) (the,4) (sells,3)
Примечание
Вы можете просматривать логи выполнения заданий и искать в них информацию с помощью сервиса Yandex Cloud Logging. Подробнее см. в разделе Работа с логами.
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать: