Работа с заданиями PySpark
Apache Spark
В этой статье на простом примере показывается, как в Yandex Data Processing использовать PySpark
Перед началом работы
-
Создайте сервисный аккаунт с ролями
dataproc.agent
иdataproc.provisioner
. -
В Object Storage создайте бакеты и настройте доступ к ним:
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
READ
для этого бакета. - Создайте бакет для результатов обработки и предоставьте сервисному аккаунту кластера разрешение
READ и WRITE
для этого бакета.
- Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение
-
Создайте кластер Yandex Data Processing со следующими настройками:
- Сервисы:
HDFS
SPARK
YARN
- Сервисный аккаунт: выберите созданный ранее сервисный аккаунт.
- Имя бакета: выберите бакет для результатов обработки.
- Сервисы:
Создайте задание PySpark
-
Загрузите файл для обработки:
-
Скопируйте и сохраните в файле
text.txt
:text.txt
she sells sea shells on the sea shore the shells that she sells are sea shells I am sure so if she sells sea shells on the sea shore I am sure that the shells are sea shore shells
-
Загрузите файл
text.txt
в бакет для исходных данных.
-
-
Загрузите файл с кодом программы анализа на языке Python:
-
Скопируйте и сохраните в файле
word_count.py
:word_count.py
import sys from pyspark import SparkConf, SparkContext def main(): if len(sys.argv) != 3: print('Usage job.py <входная_директория> <выходная_директория>') sys.exit(1) in_dir = sys.argv[1] out_dir = sys.argv[2] conf = SparkConf().setAppName("Word count - PySpark") sc = SparkContext(conf=conf) text_file = sc.textFile(in_dir) counts = text_file.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) if out_dir.startswith('s3a://'): counts.saveAsTextFile(out_dir) else: default_fs = sc._jsc.hadoopConfiguration().get('fs.defaultFS') counts.saveAsTextFile(default_fs + out_dir) if __name__ == "__main__": main()
-
Загрузите файл
word_count.py
в бакет для исходных данных.
-
-
Создайте задание PySpark с параметрами:
-
Main python файл:
s3a://<имя_бакета_для_исходных_данных>/word_count.py
-
Аргументы:
s3a://<имя_бакета_для_исходных_данных>/text.txt
s3a://<имя_бакета_для_результатов_обработки>/<папка_для_результатов>
-
-
Подождите, пока статус задания изменится на
Done
. -
Скачайте из бакета и просмотрите файлы с результатами обработки:
part-00000
('sea', 6) ('are', 2) ('am', 2) ('sure', 2)
part-00001
('she', 3) ('sells', 3) ('shells', 6) ('on', 2) ('the', 4) ('shore', 3) ('that', 2) ('I', 2) ('so', 1) ('if', 1)
Примечание
Вы можете просматривать логи выполнения заданий и искать в них информацию с помощью сервиса Yandex Cloud Logging. Подробнее см. в разделе Работа с логами.
Удалите созданные ресурсы
Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать: