Вычисления на кластерах Apache Spark™
Для работы с кластерами Yandex Data Processing требуется предварительная настройка проекта, см. подробнее в концепции.
Все кластеры Yandex Data Processing вне зависимости от варианта развертывания тарифицируются по правилам сервиса Yandex Data Processing. Все кластеры, доступные в проекте, можно посмотреть в разделе Ресурсы проекта ⟶ Yandex Data Processing на странице проекта.
Важно
Используя кластер, развернутый в сервисе Yandex Data Processing, вы управляете его жизненным циклом самостоятельно. Даже если на нем нет вычислений, кластер не будет удален и продолжит тарифицироваться.
DataSphere поддерживает работу с кластерами Yandex Data Processing при помощи:
Коннекторы Spark
Коннектор Spark — это специальный ресурс, который хранит настройки подключения к кластерам Yandex Data Processing. Настройки подключения к кластеру задаются при создании коннектора Spark. Выбранные кластеры подключаются или создаются при запуске вычислений в ячейке.
Коннектор Spark может быть опубликован в сообществе для использования в других проектах. Изменение настроек коннектора Spark применится для всех проектов, в которых он используется.
Для корректной интеграции с DataSphere через коннектор Spark развернутый кластер Yandex Data Processing должен иметь версию образа не ниже 2.0
с включенными сервисами LIVY
, SPARK
и YARN
.
Подробнее о работе с коннекторами Spark см. в инструкции.
Запуск Python-кода в кластере
-
Введите в ячейку Python-код, например:
import random def inside(p): x, y = random.random(), random.random() return x*x + y*y < 1 NUM_SAMPLES = 1_000_000_0 count = sc.parallelize(range(0, NUM_SAMPLES)).filter(inside).count() print("Pi is roughly %f" % (4.0 * count / NUM_SAMPL))
-
Выберите ячейку с кодом и запустите вычисления.
-
В открывшемся окне Конфигурации ВМ ноутбука перейдите на вкладку Для кластера Yandex Data Processing.
-
Выберите конфигурацию вычислительных ресурсов и коннектор Spark.
-
Нажмите кнопку Выбрать.
Коннектор и конфигурация окружения задаются вручную только при первом запуске вычисления. Все последующие вычисления в этом ноутбуке будут производиться на созданной при первом запуске ВМ. Кластер Yandex Data Processing подключается к ней при помощи коннектора Spark.
Синхронизация окружения Python с кластером
При работе с Python Spark через DataSphere нет необходимости вручную переносить виртуальное окружение. В кластере Yandex Data Processing есть возможность изменить базовый состав PyPI пакетов с помощью виртуального окружения:
-
Установите библиотеку
catboost
:%pip install catboost
-
После завершения установки на верхней панели выберите Kernel ⟶ Restart kernel.... Если установка прошла без ошибок, виртуальное окружение будет автоматически создано и доступно в Spark-сессии c помощью переменной
spark
.
Для синхронизации окружения в настройках коннектора Spark в блоке Настройки S3 должны быть указаны идентификатор статического ключа доступа для бакета и секрет, содержащий сам статический ключ доступа.
Важно
Синхронизация окружения Python работает в тестовом режиме. Чтобы разрешить синхронизацию окружения, в настройках коннектора Spark в блоке Настройки Spark укажите параметр .options
= venv
.
Livy-сессии
Для корректной интеграции с DataSphere через Livy-сессии развернутый кластер Yandex Data Processing должен иметь версию образа не ниже 2.0
с включенными сервисами LIVY
, SPARK
, YARN
и HDFS
.
Примечание
Чтобы получить из кластера Yandex Data Processing данные больше 100 МБ, используйте коннектор S3.
Вычислительные сессии
В кластере Yandex Data Processing ваш код выполняется в сессиях
Для управления сессиями используйте следующие команды:
%create_livy_session --cluster <имя_кластера> --id <идентификатор_сессии>
— создание сессии;%delete_livy_session --cluster <имя_кластера> --id <идентификатор_сессии>
— удаление сессии.
Например, следующая команда создаст в кластере my-new-cluster
сессию ses1
, которая позволит каждому процессу использовать максимум 4 ядра CPU в кластере и 4 ГБ RAM (подробнее см. в документации Spark
%create_livy_session --cluster my-new-cluster --id ses1 --conf spark.cores.max=4 --conf spark.executor.memory=4g
По умолчанию в сессиях включено динамическое выделение ресурсов. Чтобы ограничить ресурсы сессии, используйте параметр --conf spark.dynamicAllocation.enabled=false
.
Параметры Livy-сессии
Полный список параметров для команды %create_livy_session
:
Параметр | Тип | Описание |
---|---|---|
--cluster |
string |
Идентификатор или имя кластера Yandex Data Processing |
--id |
string |
Идентификатор сессии, произвольная строка. Если не указан, то формируется автоматически |
--conf |
string |
Свойства конфигурации Spark |
--proxyUser |
string |
Логин пользователя операционной системы кластера Yandex Data Processing, от имени которого будет выполняться задание. По умолчанию spark |
--jars |
string |
Библиотеки Java для использования в сессии |
--files |
string |
Файлы для использования в сессии |
--pyFiles |
string |
Python-файлы для использования в сессии |
--driverMemory |
string |
Объем памяти драйвера |
--driverCores |
int |
Количество ядер для драйвера |
--executorMemory |
string |
Объем памяти процесса-исполнителя |
--executorCores |
int |
Количество ядер для процесса-процесса |
--numExecutors |
int |
Количество процессов-исполнителей |
--archives |
string |
Архивы для использования в сессии |
--queue |
string |
Название очереди YARN |
--variables |
string |
Переменные для использования в сессии |
--return_variables |
string |
Переменные, которые передадутся из сессии |
--heartbeatTimeoutInSecond |
int |
Время бездействия до завершения сеанса |
--ttl |
string |
Время ожидания неактивного сеанса |
Подробнее о параметрах livy-сессии см. в официальной документации
Ограничения сессий Yandex Data Processing
DataSphere использует системные переменные для работы с кластером Yandex Data Processing. Не переопределяйте значения следующих переменных:
sc
spark
HiveContext
StreamingContext
SqlContext
Следующие глобальные конфигурации Spark переопределяются параметрами, необходимыми для выполнения заданий Livy:
spark.jars
spark.submit.deployMode
spark.yarn.dist.archives
spark.submit.pyFiles
spark.yarn.maxAppAttempts
spark.yarn.submit.waitAppCompletion
Чтобы задать дополнительные библиотеки для сессии Spark, используйте параметры spark.driver.extraClassPath
и spark.executor.extraClassPath
, а сами библиотеки разместите на всех узлах во время создания кластера Yandex Data Processing с помощью скриптов инициализации. Пути к используемым библиотекам должны совпадать на всех узлах кластера.
Запуск Python-кода в кластере
Код запускается в ячейках с заголовком:
#!spark [--cluster <кластер>] [--session <сессия>] [--variables <входящая_переменная>] [--return_variables <возвращаемая_переменная>]
Где:
--cluster
— кластер Yandex Data Processing, на котором будут производиться вычисления. Может быть:- Именем кластера, созданного через интерфейс ноутбука.
- HTTP-ссылкой на внутренний IP-адрес хоста
masternode
, напримерhttp://10.0.0.8:8998/
.
--session
— идентификатор вычислительной сессии. Если параметр пропущен, используется сессия кластера Yandex Data Processing по умолчанию.--variables
— переменная, импортированная из DataSphere в кластер Yandex Data Processing. Поддерживаемые типы:bool
,int
,float
,str
,pandas.DataFrame
(преобразовывается в Spark DataFrame в кластере).--return_variables
— переменная, которая будет экспортирована из кластера Yandex Data Processing в DataSphere. Поддерживаемые типы:bool
,int
,float
,str
,pandas.DataFrame
(преобразованный Spark DataFrame).
Пример использования вычислительных сессий с пользовательскими параметрами
Чтобы запустить вычисления в сессии с заданными настройками, сначала создайте сессию, а затем передайте код в ячейке с заголовком #!spark
:
-
Создайте сессию и определите ее параметры:
%create_livy_session --cluster my-new-cluster --id ses1 --conf spark.cores.max=4 --conf spark.executor.memory=4g
-
В следующей ячейке запустите вычисления:
#!spark --cluster my-new-cluster --session ses1 import random def inside(p): x, y = random.random(), random.random() return x*x + y*y < 1 NUM_SAMPLES = 1_000_000 count = sc.parallelize(range(0, NUM_SAMPLES)) \ .filter(inside).count() print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
-
Если сессия вам больше не нужна, удалите ее:
%delete_livy_session --cluster my-new-cluster --id ses1
Работа с библиотекой Spark SQL
DataSphere может работать с библиотекой Spark SQL. Например, следующий запрос вернет все записи в таблице animals
, созданной в кластере cluster test-dataproc-cluster
:
#!spark --cluster test-dataproc-cluster --return_variables df
df = spark.sql("SELECT * FROM animals;")
df
Подробнее о синтаксисе SQL-запросов и работе с библиотекой Spark SQL см. в официальной документации