Working with PySpark jobs
Apache Spark
In this section, we provide a simple example that demonstrates how to use PySpark
Getting started
-
Create a service account with the
dataproc.agentanddataproc.provisionerroles. -
In Object Storage, create buckets and configure access to them:
- Create a bucket for the input data and grant the
READpermission for this bucket to the cluster service account. - Create a bucket for the processing output and grant the cluster service account
READ and WRITEpermissions for this bucket.
- Create a bucket for the input data and grant the
-
Create a Yandex Data Processing cluster with the following settings:
- Environment:
PRODUCTION. - Services:
HDFSSPARKYARN
- Service account: Select the service account you created earlier.
- Bucket name: Select a bucket for the processing results.
- Environment:
Create a PySpark job
-
Upload a file for processing:
-
Copy and save the following to a file named
text.txt:text.txt
she sells sea shells on the sea shore the shells that she sells are sea shells I am sure so if she sells sea shells on the sea shore I am sure that the shells are sea shore shells -
Upload the
text.txtfile to the source data bucket.
-
-
Upload a file with the analysis program code in Python:
-
Copy and save to the
word_count.pyfile:word_count.py
import sys from pyspark import SparkConf, SparkContext def main(): if len(sys.argv) != 3: print('Usage job.py <input_directory> <output_directory>') sys.exit(1) in_dir = sys.argv[1] out_dir = sys.argv[2] conf = SparkConf().setAppName("Word count - PySpark") sc = SparkContext(conf=conf) text_file = sc.textFile(in_dir) counts = text_file.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) if out_dir.startswith('s3a://'): counts.saveAsTextFile(out_dir) else: default_fs = sc._jsc.hadoopConfiguration().get('fs.defaultFS') counts.saveAsTextFile(default_fs + out_dir) if __name__ == "__main__": main() -
Upload the
word_count.pyfile to the input data bucket.
-
-
Create a PySpark job with the following parameters:
-
Main python file:
s3a://<input_data_bucket_name>/word_count.py -
Arguments:
s3a://<input_data_bucket_name>/text.txts3a://<output_bucket_name>/<output_directory>
-
-
Wait for the job status to change to
Done. -
Download the files with the results from the bucket and review them:
part-00000
('sea', 6) ('are', 2) ('am', 2) ('sure', 2)part-00001
('she', 3) ('sells', 3) ('shells', 6) ('on', 2) ('the', 4) ('shore', 3) ('that', 2) ('I', 2) ('so', 1) ('if', 1)
Note
You can view the job logs and search data in them using Yandex Cloud Logging. For more information, see Working with logs.
Delete the resources you created
Some resources are not free of charge. To avoid paying for them, delete the resources you no longer need: