Запуск и управление приложениями для Spark и PySpark
Существует несколько способов запустить Spark- и PySpark-задания в кластере Yandex Data Processing:
- Spark Shell (командная оболочка для языков программирования Scala и Python). Расчеты запускаются не с помощью скрипта, а построчно. Подробнее о Spark Shell читайте в документации Spark
. - Скрипт spark-submit. Сохраняет результаты расчета в HDFS. Подробнее о
spark-submit
читайте в документации Spark . - Команды CLI Yandex Cloud. Позволяют сохранить результаты расчета не только в HDFS, но и в бакете Yandex Object Storage.
Ниже рассматривается пример, по которому будет рассчитана статистика по воздушному трафику США за 2018 год по данным с сайта transtats.bts.govyc-mdb-examples
.
Перед началом работы
Подготовьте инфраструктуру:
-
Создайте сеть с именем
data-proc-network
. При создании выключите опцию Создать подсети. -
В сети
data-proc-network
создайте подсеть со следующими параметрами:- Имя —
data-proc-subnet-a
. - Зона —
ru-central1-a
. - CIDR —
192.168.1.0/24
.
- Имя —
-
Создайте NAT-шлюз и таблицу маршрутизации с именем
data-proc-route-table
в сетиdata-proc-network
. Привяжите таблицу к подсетиdata-proc-subnet-a
. -
В сети
data-proc-network
создайте группу безопасности с именемdata-proc-security-group
и следующими правилами:-
По одному правилу для входящего и исходящего служебного трафика:
- Диапазон портов —
0-65535
. - Протокол —
Любой
. - Источник/Назначение —
Группа безопасности
. - Группа безопасности —
Текущая
.
- Диапазон портов —
-
Правило для входящего трафика, чтобы подключаться к хостам подкластеров из интернета:
- Диапазон портов —
22
. - Протокол —
TCP
. - Источник —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
Правило для исходящего HTTPS-трафика:
- Диапазон портов —
443
. - Протокол —
TCP
. - Назначение —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
Правило для исходящего HTTP-трафика:
- Диапазон портов —
80
. - Протокол —
TCP
. - Назначение —
CIDR
. - CIDR блоки —
0.0.0.0/0
.
- Диапазон портов —
-
-
Создайте сервисный аккаунт
data-proc-sa
с ролями: -
Создайте бакет Yandex Object Storage
data-proc-bucket
с ограниченным доступом. -
Предоставьте сервисному аккаунту
data-proc-sa
разрешениеREAD и WRITE
на бакетdata-proc-bucket
. -
Создайте кластер Yandex Data Processing любой подходящей конфигурации с настройками:
- Сервисный аккаунт —
data-proc-sa
. - Зона доступности —
ru-central1-a
. - Имя бакета —
data-proc-bucket
. - Сеть —
data-proc-network
. - Группы безопасности —
data-proc-security-group
. - Публичный доступ для подкластеров — предоставлен.
- Сервисный аккаунт —
-
Если у вас еще нет Terraform, установите его.
-
Получите данные для аутентификации. Вы можете добавить их в переменные окружения или указать далее в файле с настройками провайдера.
-
Настройте и инициализируйте провайдер. Чтобы не создавать конфигурационный файл с настройками провайдера вручную, скачайте его
. -
Поместите конфигурационный файл в отдельную рабочую директорию и укажите значения параметров. Если данные для аутентификации не были добавлены в переменные окружения, укажите их в конфигурационном файле.
-
Скачайте в ту же рабочую директорию файл конфигурации data-proc-for-spark-jobs.tf
.В этом файле описаны:
- сеть;
- подсеть;
- NAT-шлюз и таблица маршрутизации;
- группы безопасности;
- сервисный аккаунт для работы с ресурсами кластера;
- бакет, в котором будут храниться зависимости заданий и результаты их выполнения;
- кластер Yandex Data Processing.
-
Укажите в файле конфигурации
data-proc-for-spark-jobs.tf
необходимые параметры. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в файлах конфигурации есть ошибки, Terraform на них укажет.
-
Создайте необходимую инфраструктуру:
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
В указанном каталоге будут созданы все требуемые ресурсы. Проверить появление ресурсов и их настройки можно в консоли управления
. -
Использование Spark Shell
-
Подключитесь по SSH к хосту-мастеру кластера Yandex Data Processing.
-
Запустите Spark Shell на хосте-мастере:
/usr/bin/pyspark
Количество ядер и процессов выполнения задач (executor) ограничено только конфигурацией вашего кластера Yandex Data Processing.
-
Построчно введите следующий код:
sql = SQLContext(sc) df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01")
Последняя строка — чтение данных из публичного бакета с набором данных для примера. После выполнения этой строки в текущей сессии будет доступен организованный набор данных (DataFrame)
df
с прочитанными данными. -
Чтобы увидеть схему полученного DataFrame, выполните команду:
df.printSchema()
В терминале будет выведен список столбцов с их типами.
-
Рассчитайте статистику перелетов по месяцам и найдите первую десятку городов по количеству вылетов:
-
Количество перелетов по месяцам:
df.groupBy("Month").count().orderBy("Month").show()
-
Первая десятка городов по количеству вылетов:
df.groupBy("OriginCityName").count().orderBy("count", ascending=False).show(10)
-
Использование Spark Submit
Spark Submit позволяет запускать заранее написанные приложения через скрипт spark-submit
. Для примера рассчитывается количество перелетов по месяцам.
-
Подключитесь по SSH к хосту-мастеру кластера Yandex Data Processing.
-
На хосте-мастере создайте файл
month_stat.py
со следующим кодом:import sys from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext def main(): conf = SparkConf().setAppName("Month Stat - Python") conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") sc = SparkContext(conf=conf) sql = SQLContext(sc) df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01") defaultFS = sc._jsc.hadoopConfiguration().get("fs.defaultFS") month_stat = df.groupBy("Month").count() month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat") if __name__ == "__main__": main()
-
Запустите приложение:
/usr/bin/spark-submit month_stat.py
-
Результат работы приложения будет выгружен в HDFS. Список получившихся файлов можно вывести командой:
hdfs dfs -ls /tmp/month_stat
В примере рассматривается сборка и запуск приложения на языке программирования Scala
Чтобы создать и запустить Spark-приложение:
-
Подключитесь по SSH к хосту-мастеру кластера Yandex Data Processing.
-
Установите
стандартную утилиту сборки sbt для Scala. Она устанавливается вместе с языком программирования Scala. -
Создайте папку, например
spark-app
. -
В созданную папку добавьте файл с путем
./src/main/scala/app.scala
. -
Скопируйте следующий код в файл
app.scala
:package com.yandex.cloud.dataproc.scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext object Main { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Month Stat - Scala App") val sc = new SparkContext(conf) sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01") val month_stat = df.groupBy("Month").count() val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat") sc.stop() } }
-
Подготовьте данные для сборки приложения:
-
Чтобы узнать установленную версию Scala, выполните команду
scala -version
. -
Чтобы узнать версии
spark-core
иspark-sql
, посмотрите содержимое каталога/usr/lib/spark/jars
:ls /usr/lib/spark/jars
Версии указаны в названии JAR-файлов. Пример:
spark-core_2.12-3.0.3.jar spark-sql_2.12-3.0.3.jar
Нужный номер версии —
3.0.3
. -
В папке
spark-app
создайте файлbuild.sbt
с конфигурацией:scalaVersion := "<версия_Scala>" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "<версия_spark-core>" % "provided", "org.apache.spark" %% "spark-sql" % "<версия_spark-sql>" % "provided" )
Пример:
scalaVersion := "2.12.10" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "3.0.3" % "provided", "org.apache.spark" %% "spark-sql" % "3.0.3" % "provided" )
-
-
Скомпилируйте и соберите JAR-файл:
sbt compile && sbt package
-
Получите название собранного JAR-файла:
ls ~/spark-app/target/scala-<версия_Scala>
Результат —
spark-app_2.12-0.1.0-SNAPSHOT.jar
. -
Запустите получившееся приложение:
/usr/bin/spark-submit --class com.yandex.cloud.dataproc.scala.Main target/scala-<версия_Scala>/<название_собранного_JAR-файла>
Пример:
/usr/bin/spark-submit --class com.yandex.cloud.dataproc.scala.Main target/scala-2.12/spark-app_2.12-0.1.0-SNAPSHOT.jar
-
Результат работы приложения будет выгружен в HDFS. Список получившихся файлов можно вывести командой:
hdfs dfs -ls /tmp/month_stat
Завершение работы приложения
По умолчанию ресурсы запускаемого приложения управляются компонентом YARN. Если приложение необходимо завершить или убрать из очереди, используйте утилиту yarn
:
-
Выведите список приложений:
yarn application -list
-
Завершите ненужное приложение:
yarn application -kill <идентификатор_приложения>
Более подробно с командами YARN можно ознакомиться на странице YARN Commands
Запуск заданий (jobs) с помощью CLI Yandex Cloud
Запуск заданий с помощью CLI Yandex Cloud происходит посредством агента Yandex Data Processing, установленного на хосте-мастере кластера. Параметры заданий передаются агенту через Yandex Data Processing API.
Исполняемый файл и его зависимости должны находиться в хранилище, к которому есть доступ у сервисного аккаунта кластера Yandex Data Processing. У самого запускаемого приложения должен быть доступ к хранилищам, в которых хранятся исходный набор данных и результаты запуска.
Результаты расчета можно сохранить в HDFS в кластере Yandex Data Processing или в бакете data-proc-bucket
, указанном при создании кластера.
Служебная и отладочная информация сохраняется в бакете data-proc-bucket
. Для каждого задания агент Yandex Data Processing создает отдельную папку с путем вида dataproc/clusters/<идентификатор_кластера>/jobs/<идентификатор_задачи>
.
Примечание
Вы можете просматривать логи выполнения заданий и искать в них информацию с помощью сервиса Yandex Cloud Logging. Подробнее см. в разделе Работа с логами.
Ниже приведены два варианта приложения — для Python и Scala.
Запуск PySpark-задания
Чтобы запустить PySpark-задание:
Установите дополнительные зависимости
На локальном компьютере выполните действия:
-
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра
--folder-name
или--folder-id
. -
Установите и настройте консольный клиент S3cmd для работы с Yandex Object Storage.
-
Установите Python. Убедитесь, что версия Python совпадает с версией, доступной из образа. Проверить версию можно в разделе Среда исполнения. Для версии образа 2.0 используйте Python 3.8.10:
sudo apt update && sudo apt install python3.8
Подготовьте и запустите PySpark-задание
-
Создайте файл
job.py
со следующим кодом:import sys from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext def main(): if len(sys.argv) != 3: print('Usage job.py <входная_директория> <выходная_директория>') sys.exit(1) in_dir = sys.argv[1] out_dir = sys.argv[2] conf = SparkConf().setAppName('Month Stat - Python') sc = SparkContext(conf=conf) sql = SQLContext(sc) df = sql.read.parquet(in_dir) month_stat = df.groupBy('Month').count() job_id = dict(sc._conf.getAll())['spark.yarn.tags'].replace('dataproc_job_', '') if out_dir.startswith('s3a://'): month_stat.repartition(1).write.format('csv').save(out_dir + job_id) else: default_fs = sc._jsc.hadoopConfiguration().get('fs.defaultFS') month_stat.repartition(1).write.format('csv').save(default_fs + out_dir + job_id) if __name__ == '__main__': main()
-
Чтобы PySpark имел доступ к вашему коду, загрузите файл
job.py
в бакет Object Storage, к которому есть доступ у сервисного аккаунта кластера Yandex Data Processing:s3cmd put ./job.py s3://data-proc-bucket/bin/
-
Запустите задание.
Команда для запуска зависит от того, где нужно сохранить результаты задания: в Object Storage или HDFS.
Object StorageДиректория HDFSyc dataproc job create-pyspark \ --cluster-id=<идентификатор_кластера> \ --name=<имя_задачи> \ --main-python-file-uri="s3a://data-proc-bucket/bin/job.py" \ --args="s3a://yc-mdb-examples/dataproc/example01/set01" \ --args="s3a://data-proc-bucket/jobs_results/"
В команде укажите:
--cluster-id
— идентификатор кластера. Его можно получить со списком кластеров в каталоге.--name
— произвольное имя Spark-задания.
CSV-файл с результатом сохранится в бакете
data-proc-bucket
.yc dataproc job create-pyspark \ --cluster-id=<идентификатор_кластера> \ --name=<имя_задачи> \ --main-python-file-uri="s3a://data-proc-bucket/bin/job.py" \ --args="s3a://yc-mdb-examples/dataproc/example01/set01" \ --args="tmp/jobs/"
В команде укажите:
--cluster-id
— идентификатор кластера. Его можно получить со списком кластеров в каталоге.--name
— произвольное имя Spark-задания.
CSV-файл с результатом сохранится в папке
/tmp/jobs/<идентификатор_задачи>/
в HDFS. -
(Опционально) Посмотрите логи задачи:
yc dataproc job log <идентификатор_задачи> --cluster-id=<идентификатор_кластера>
Запуск Spark-задания
Чтобы запустить Spark-задание:
- Установите дополнительные зависимости.
- Соберите Scala-приложение.
- Загрузите JAR-файл в Object Storage.
- Запустите Spark-задание в кластере Yandex Data Processing.
Установите дополнительные зависимости
-
Если у вас еще нет интерфейса командной строки Yandex Cloud, установите и инициализируйте его.
По умолчанию используется каталог, указанный в профиле CLI. Вы можете указать другой каталог с помощью параметра
--folder-name
или--folder-id
. -
Подключитесь по SSH к хосту-мастеру кластера Yandex Data Processing.
-
Установите
стандартную утилиту сборкиsbt
для Scala. Она устанавливается вместе с языком программирования Scala. -
Установите и настройте консольный клиент S3cmd для работы с Yandex Object Storage.
Соберите Scala-приложение
Чтобы упростить управление зависимостями, соберите приложение в один JAR-файл (fat JAR) с помощью плагина sbt-assembly
-
Создайте папку
spark-app
, в ней — папкиproject
иsrc/main/scala
. -
Создайте файл
spark-app/project/plugins.sbt
, который описывает подключение плагинаsbt-assembly
для сборки единого JAR-файла:addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "<версия_sbt-assembly>")
Версию плагина
sbt-assembly
см. в его репозитории , в разделе Releases. -
Узнайте установленную версию Scala с помощью команды
scala -version
. -
Создайте файл
spark-app/build.sbt
с описанием зависимостей и стратегии их слияния в одном JAR-файле. В файлеbuild.sbt
укажите версию Scala:scalaVersion := "<версия_Scala>" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.4.4", "org.apache.spark" %% "spark-sql" % "2.4.4", ) assembly / assemblyMergeStrategy := { case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last case PathList("javax", "inject", xs @ _*) => MergeStrategy.last case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last case PathList("javax", "activation", xs @ _*) => MergeStrategy.last case PathList("org", "apache", xs @ _*) => MergeStrategy.last case PathList("com", "google", xs @ _*) => MergeStrategy.last case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last case PathList("com", "codahale", xs @ _*) => MergeStrategy.last case PathList("com", "yammer", xs @ _*) => MergeStrategy.last case "about.html" => MergeStrategy.rename case "overview.html" => MergeStrategy.last case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last case "META-INF/mailcap" => MergeStrategy.last case "META-INF/mimetypes.default" => MergeStrategy.last case "plugin.properties" => MergeStrategy.last case "log4j.properties" => MergeStrategy.last case "git.properties" => MergeStrategy.last case x => val oldStrategy = (assembly / assemblyMergeStrategy).value oldStrategy(x) }
-
Создайте файл
spark-app/src/main/scala/app.scala
с кодом приложения:package com.yandex.cloud.dataproc.scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext object Main { def main(args: Array[String]) { if (args.length != 2){ //проверяем аргумент System.err.println("Usage spark-app.jar <входная_директория> <выходная_директория>"); System.exit(-1); } val inDir = args(0); //URI на исходные данные val outDir = args(1); //URI на директорию, куда записать результат val conf = new SparkConf().setAppName("Month Stat - Scala App") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.parquet(inDir) val monthStat = df.groupBy("Month").count() val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") //получить эндпоинт сервера HDFS val jobId = conf.get("spark.yarn.tags").replace("dataproc_job_", ""); //получить идентификатор задания if (outDir.toLowerCase().startsWith("s3a://")) { monthStat.repartition(1).write.format("csv").save(outDir + jobId) } else { monthStat.repartition(1).write.format("csv").save(defaultFS + "/" + outDir + jobId) } sc.stop() } }
-
Запустите сборку приложения в папке
spark-app
:sbt clean && sbt compile && sbt assembly
В случае ошибки
Error looking up function 'stat'
Если вы получили ошибку
java.lang.UnsatisfiedLinkError: Error looking up function 'stat': java: undefined symbol: stat
и ОС хоста-мастера — Ubuntu, запустите каждую командуsbt
с флагом-Dsbt.io.jdktimestamps=true
:sbt clean -Dsbt.io.jdktimestamps=true && \ sbt compile -Dsbt.io.jdktimestamps=true && \ sbt assembly -Dsbt.io.jdktimestamps=true
Файл будет доступен по следующему пути: spark-app/target/scala-<версия_Scala>/spark-app-assembly-0.1.0-SNAPSHOT.jar
.
Загрузите JAR-файл в Object Storage
Чтобы Spark имел доступ к собранному JAR-файлу, загрузите файл в бакет data-proc-bucket
. Загрузить файл можно с помощью s3cmd:
s3cmd put ~/spark-app/target/scala-<версия_Scala>/spark-app-assembly-0.1.0-SNAPSHOT.jar s3://data-proc-bucket/bin/
Файл загружается по адресу s3://data-proc-bucket/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar
.
Запустите Spark-задание в кластере Yandex Data Processing
-
Отключитесь от хоста-мастера кластера.
-
Запустите задание.
Команда для запуска зависит от того, где нужно сохранить результаты задания: в Object Storage или HDFS.
Object StorageДиректория HDFSyc dataproc job create-spark \ --cluster-id=<идентификатор_кластера> \ --name=<имя_задачи> \ --main-class="com.yandex.cloud.dataproc.scala.Main" \ --main-jar-file-uri="s3a://data-proc-bucket/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar" \ --args="s3a://yc-mdb-examples/dataproc/example01/set01" \ --args="s3a://data-proc-bucket/jobs_results/"
В команде укажите:
--cluster-id
— идентификатор кластера. Его можно получить со списком кластеров в каталоге.--name
— произвольное имя Spark-задания.
CSV-файл с результатом сохранится в бакете
data-proc-bucket
.yc dataproc job create-spark \ --cluster-id=<идентификатор_кластера> \ --name=<имя_задачи> \ --main-class="com.yandex.cloud.dataproc.scala.Main" \ --main-jar-file-uri="s3a://data-proc-bucket/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar" \ --args="s3a://yc-mdb-examples/dataproc/example01/set01" \ --args="tmp/jobs/"
В команде укажите:
--cluster-id
— идентификатор кластера. Его можно получить со списком кластеров в каталоге.--name
— произвольное имя Spark-задания.
CSV-файл с результатом сохранится в папке
/tmp/jobs/<идентификатор_задачи>/
в HDFS.Пример сообщения об успешном запуске задачи:
done (1m2s) id: {your_job_id} cluster_id: {your_cluster_id} name: test02 status: DONE spark_job: args: - s3a://yc-mdb-examples/dataproc/example01/set01 - s3a://data-proc-bucket/jobs_results/ main_jar_file_uri: s3a://data-proc-bucket/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar main_class: com.yandex.cloud.dataproc.scala.Main
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:
- Удалите кластер Yandex Data Processing.
- Если вы зарезервировали публичные статические IP-адреса, освободите и удалите их.
- Удалите подсеть.
- Удалите таблицу маршрутизации.
- Удалите NAT-шлюз.
- Удалите сеть.
Чтобы удалить инфраструктуру, созданную с помощью Terraform:
-
В терминале перейдите в директорию с планом инфраструктуры.
-
Удалите конфигурационный файл
data-proc-for-spark-jobs.tf
. -
Проверьте корректность файлов конфигурации Terraform с помощью команды:
terraform validate
Если в конфигурационных файлах есть ошибки, Terraform на них укажет.
-
Подтвердите изменение ресурсов.
-
Выполните команду для просмотра планируемых изменений:
terraform plan
Если конфигурации ресурсов описаны верно, в терминале отобразится список изменяемых ресурсов и их параметров. Это проверочный этап: ресурсы не будут изменены.
-
Если вас устраивают планируемые изменения, внесите их:
-
Выполните команду:
terraform apply
-
Подтвердите изменение ресурсов.
-
Дождитесь завершения операции.
-
-
Все ресурсы, которые были описаны в конфигурационном файле, будут удалены.