Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Data Processing
  • Начало работы
    • Все руководства
      • Обзор
      • Работа с заданиями Hive
      • Работа с заданиями MapReduce
      • Работа с заданиями PySpark
      • Работа с заданиями Spark
      • Запуск заданий Apache Hive
      • Запуск Spark-приложений
      • Запуск заданий с удаленного хоста
  • Управление доступом
  • Правила тарификации
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Публичные материалы
  • Вопросы и ответы
  • Обучающие курсы

В этой статье:

  • Перед началом работы
  • Создайте задание PySpark
  • Удалите созданные ресурсы
  1. Практические руководства
  2. Работа с заданиями
  3. Работа с заданиями PySpark

Работа с заданиями PySpark

Статья создана
Yandex Cloud
Обновлена 28 апреля 2025 г.
  • Перед началом работы
  • Создайте задание PySpark
  • Удалите созданные ресурсы

Apache Spark — это фреймворк для реализации распределенной обработки неструктурированных и слабоструктурированных данных, входящий в экосистему проектов Hadoop.

В этой статье на простом примере показывается, как в Yandex Data Processing использовать PySpark — интерфейс Spark для языка Python. При помощи PySpark в приведенном примере подсчитывается число случаев употребления каждого из слов, которые встречаются в коротком образце текста.

Перед началом работыПеред началом работы

  1. Создайте сервисный аккаунт с ролями dataproc.agent и dataproc.provisioner.

  2. В Object Storage создайте бакеты и настройте доступ к ним:

    1. Создайте бакет для исходных данных и предоставьте сервисному аккаунту кластера разрешение READ для этого бакета.
    2. Создайте бакет для результатов обработки и предоставьте сервисному аккаунту кластера разрешение READ и WRITE для этого бакета.
  3. Создайте кластер Yandex Data Processing со следующими настройками:

    • Окружение — PRODUCTION.
    • Сервисы:
      • HDFS
      • SPARK
      • YARN
    • Сервисный аккаунт: выберите созданный ранее сервисный аккаунт.
    • Имя бакета: выберите бакет для результатов обработки.

Создайте задание PySparkСоздайте задание PySpark

  1. Загрузите файл для обработки:

    1. Скопируйте и сохраните в файле text.txt:

      text.txt
      she sells sea shells on the sea shore
      the shells that she sells are sea shells I am sure
      so if she sells sea shells on the sea shore
      I am sure that the shells are sea shore shells
      
    2. Загрузите файл text.txt в бакет для исходных данных.

  2. Загрузите файл с кодом программы анализа на языке Python:

    1. Скопируйте и сохраните в файле word_count.py:

      word_count.py
      import sys
      from pyspark import SparkConf, SparkContext
      
      
      def main():
      
          if len(sys.argv) != 3:
              print('Usage job.py <входная_директория> <выходная_директория>')
              sys.exit(1)
      
          in_dir = sys.argv[1]
          out_dir = sys.argv[2]
      
          conf = SparkConf().setAppName("Word count - PySpark")
          sc = SparkContext(conf=conf)
      
          text_file = sc.textFile(in_dir)
          counts = text_file.flatMap(lambda line: line.split(" ")) \
              .map(lambda word: (word, 1)) \
              .reduceByKey(lambda a, b: a + b)
      
          if out_dir.startswith('s3a://'):
              counts.saveAsTextFile(out_dir) 
          else:
              default_fs = sc._jsc.hadoopConfiguration().get('fs.defaultFS')
              counts.saveAsTextFile(default_fs + out_dir)
      
      
      if __name__ == "__main__":
          main()
      
    2. Загрузите файл word_count.py в бакет для исходных данных.

  3. Создайте задание PySpark с параметрами:

    • Main python файл: s3a://<имя_бакета_для_исходных_данных>/word_count.py

    • Аргументы:

      • s3a://<имя_бакета_для_исходных_данных>/text.txt
      • s3a://<имя_бакета_для_результатов_обработки>/<папка_для_результатов>
  4. Подождите, пока статус задания изменится на Done.

  5. Скачайте из бакета и просмотрите файлы с результатами обработки:

    part-00000
    ('sea', 6)
    ('are', 2)
    ('am', 2)
    ('sure', 2)
    
    part-00001
    ('she', 3)
    ('sells', 3)
    ('shells', 6)
    ('on', 2)
    ('the', 4)
    ('shore', 3)
    ('that', 2)
    ('I', 2)
    ('so', 1)
    ('if', 1)
    

Примечание

Вы можете просматривать логи выполнения заданий и искать в них информацию с помощью сервиса Yandex Cloud Logging. Подробнее см. в разделе Работа с логами.

Удалите созданные ресурсыУдалите созданные ресурсы

Некоторые ресурсы платные. Чтобы за них не списывалась плата, удалите ресурсы, которые вы больше не будете использовать:

  1. Удалите кластер.
  2. Удалите бакеты.
  3. Удалите сервисный аккаунт.

Была ли статья полезна?

Предыдущая
Работа с заданиями MapReduce
Следующая
Работа с заданиями Spark
Проект Яндекса
© 2025 ООО «Яндекс.Облако»