Working with PySpark jobs
Apache Spark
In this section, we provide a simple example that demonstrates how to use PySpark
Getting started
-
Create a service account with the
dataproc.agent
anddataproc.provisioner
roles. -
In Object Storage, create buckets and configure access to them:
- Create a bucket for the input data and grant the cluster service account
READ
permissions for this bucket. - Create a bucket for the processing output and grant the cluster service account
READ and WRITE
permissions for this bucket.
- Create a bucket for the input data and grant the cluster service account
-
Create a Yandex Data Processing cluster with the following settings:
- Services:
HDFS
SPARK
YARN
- Service account: Select the service account you previously created.
- Bucket name: Select a bucket to hold the processing results.
- Services:
Create a PySpark job
-
Upload a file for processing:
-
Copy and save the following to a file named
text.txt
:text.txt
she sells sea shells on the sea shore the shells that she sells are sea shells I am sure so if she sells sea shells on the sea shore I am sure that the shells are sea shore shells
-
Upload the
text.txt
file to the source data bucket.
-
-
Upload a file with the Python code of the analysis program:
-
Copy and save to the
word_count.py
file:word_count.py
import sys from pyspark import SparkConf, SparkContext def main(): if len(sys.argv) != 3: print('Usage job.py <input_directory> <output_directory>') sys.exit(1) in_dir = sys.argv[1] out_dir = sys.argv[2] conf = SparkConf().setAppName("Word count - PySpark") sc = SparkContext(conf=conf) text_file = sc.textFile(in_dir) counts = text_file.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) if out_dir.startswith('s3a://'): counts.saveAsTextFile(out_dir) else: default_fs = sc._jsc.hadoopConfiguration().get('fs.defaultFS') counts.saveAsTextFile(default_fs + out_dir) if __name__ == "__main__": main()
-
Upload the
word_count.py
file to the source data bucket.
-
-
Create a PySpark job with the following parameters:
-
Main python file:
s3a://<input_data_bucket_name>/word_count.py
-
Arguments:
s3a://<input_data_bucket_name>/text.txt
s3a://<output_bucket_name>/<output_directory>
-
-
Wait for the job status to change to
Done
. -
Download from the bucket and review the files with the results from the bucket:
part-00000
('sea', 6) ('are', 2) ('am', 2) ('sure', 2)
part-00001
('she', 3) ('sells', 3) ('shells', 6) ('on', 2) ('the', 4) ('shore', 3) ('that', 2) ('I', 2) ('so', 1) ('if', 1)
Note
You can view the job logs and search data in them using Yandex Cloud Logging. For more information, see Working with logs.
Delete the resources you created
Some resources are not free of charge. To avoid paying for them, delete the resources you no longer need: