Вычисления на кластерах Apache Spark™
Для работы с кластерами Yandex Data Processing требуется предварительная настройка проекта, см. подробнее в концепции.
Все кластеры Yandex Data Processing вне зависимости от варианта развертывания тарифицируются по правилам сервиса Yandex Data Processing. Все кластеры, доступные в проекте, можно посмотреть в разделе Ресурсы проекта ⟶ Yandex Data Processing на странице проекта.
Важно
Используя кластер, развернутый в сервисе Yandex Data Processing, вы управляете его жизненным циклом самостоятельно. Даже если на нем нет вычислений, кластер не будет удален и продолжит тарифицироваться.
DataSphere поддерживает работу с кластерами Yandex Data Processing при помощи:
Коннекторы Spark
Коннектор Spark — это специальный ресурс, который хранит настройки подключения к кластерам Yandex Data Processing. Настройки подключения к кластеру задаются при создании коннектора Spark. Выбранные кластеры подключаются или создаются при запуске вычислений в ячейке.
Коннектор Spark может быть опубликован в сообществе для использования в других проектах. Изменение настроек коннектора Spark применится для всех проектов, в которых он используется.
Для корректной интеграции с DataSphere через коннектор Spark развернутый кластер Yandex Data Processing должен иметь версию образа не ниже 2.0 с включенными сервисами LIVY, SPARK и YARN.
Подробнее о работе с коннекторами Spark см. в инструкции.
Запуск Python-кода в кластере
-
Введите в ячейку Python-код, например:
import random def inside(p): x, y = random.random(), random.random() return x*x + y*y < 1 NUM_SAMPLES = 1_000_000_0 count = sc.parallelize(range(0, NUM_SAMPLES)).filter(inside).count() print("Pi is roughly %f" % (4.0 * count / NUM_SAMPL)) -
Выберите ячейку с кодом и запустите вычисления.
-
В открывшемся окне Конфигурации ВМ ноутбука перейдите на вкладку Для кластера Yandex Data Processing.
-
Выберите конфигурацию вычислительных ресурсов и коннектор Spark.
-
Нажмите кнопку Выбрать.
Коннектор и конфигурация окружения задаются вручную только при первом запуске вычисления. Все последующие вычисления в этом ноутбуке будут производиться на созданной при первом запуске ВМ. Кластер Yandex Data Processing подключается к ней при помощи коннектора Spark.
Синхронизация окружения Python с кластером
При работе с Python Spark через DataSphere нет необходимости вручную переносить виртуальное окружение. В кластере Yandex Data Processing есть возможность изменить базовый состав PyPI пакетов с помощью виртуального окружения:
-
Установите библиотеку
catboost:%pip install catboost -
После завершения установки на верхней панели выберите Kernel ⟶ Restart kernel.... Если установка прошла без ошибок, виртуальное окружение будет автоматически создано и доступно в Spark-сессии c помощью переменной
spark.
Для синхронизации окружения в настройках коннектора Spark в блоке Настройки S3 должны быть указаны идентификатор статического ключа доступа для бакета и секрет, содержащий сам статический ключ доступа.
Важно
Синхронизация окружения Python работает в тестовом режиме. Чтобы разрешить синхронизацию окружения, в настройках коннектора Spark в блоке Настройки Spark укажите параметр .options = venv.
Livy-сессии
Для корректной интеграции с DataSphere через Livy-сессии развернутый кластер Yandex Data Processing должен иметь версию образа не ниже 2.0 с включенными сервисами LIVY, SPARK, YARN и HDFS.
Примечание
Чтобы получить из кластера Yandex Data Processing данные больше 100 МБ, используйте коннектор S3.
Вычислительные сессии
В кластере Yandex Data Processing ваш код выполняется в сессиях
Для управления сессиями используйте следующие команды:
%create_livy_session --cluster <имя_кластера> --id <идентификатор_сессии>— создание сессии;%delete_livy_session --cluster <имя_кластера> --id <идентификатор_сессии>— удаление сессии.
Например, следующая команда создаст в кластере my-new-cluster сессию ses1, которая позволит каждому процессу использовать максимум 4 ядра CPU в кластере и 4 ГБ RAM (подробнее см. в документации Spark
%create_livy_session --cluster my-new-cluster --id ses1 --conf spark.cores.max=4 --conf spark.executor.memory=4g
По умолчанию в сессиях включено динамическое выделение ресурсов. Чтобы ограничить ресурсы сессии, используйте параметр --conf spark.dynamicAllocation.enabled=false.
Параметры Livy-сессии
Полный список параметров для команды %create_livy_session:
| Параметр | Тип | Описание |
|---|---|---|
--cluster |
string |
Идентификатор или имя кластера Yandex Data Processing |
--id |
string |
Идентификатор сессии, произвольная строка. Если не указан, то формируется автоматически |
--conf |
string |
Свойства конфигурации Spark |
--proxyUser |
string |
Логин пользователя операционной системы кластера Yandex Data Processing, от имени которого будет выполняться задание. По умолчанию spark |
--jars |
string |
Библиотеки Java для использования в сессии |
--files |
string |
Файлы для использования в сессии |
--pyFiles |
string |
Python-файлы для использования в сессии |
--driverMemory |
string |
Объем памяти драйвера |
--driverCores |
int |
Количество ядер для драйвера |
--executorMemory |
string |
Объем памяти процесса-исполнителя |
--executorCores |
int |
Количество ядер для процесса-процесса |
--numExecutors |
int |
Количество процессов-исполнителей |
--archives |
string |
Архивы для использования в сессии |
--queue |
string |
Название очереди YARN |
--variables |
string |
Переменные для использования в сессии |
--return_variables |
string |
Переменные, которые передадутся из сессии |
--heartbeatTimeoutInSecond |
int |
Время бездействия до завершения сеанса |
--ttl |
string |
Время ожидания неактивного сеанса |
Подробнее о параметрах livy-сессии см. в официальной документации
Ограничения сессий Yandex Data Processing
DataSphere использует системные переменные для работы с кластером Yandex Data Processing. Не переопределяйте значения следующих переменных:
scsparkHiveContextStreamingContextSqlContext
Следующие глобальные конфигурации Spark переопределяются параметрами, необходимыми для выполнения заданий Livy:
spark.jarsspark.submit.deployModespark.yarn.dist.archivesspark.submit.pyFilesspark.yarn.maxAppAttemptsspark.yarn.submit.waitAppCompletion
Чтобы задать дополнительные библиотеки для сессии Spark, используйте параметры spark.driver.extraClassPath и spark.executor.extraClassPath, а сами библиотеки разместите на всех узлах во время создания кластера Yandex Data Processing с помощью скриптов инициализации. Пути к используемым библиотекам должны совпадать на всех узлах кластера.
Запуск Python-кода в кластере
Код запускается в ячейках с заголовком:
#!spark [--cluster <кластер>] [--session <сессия>] [--variables <входящая_переменная>] [--return_variables <возвращаемая_переменная>]
Где:
--cluster— кластер Yandex Data Processing, на котором будут производиться вычисления. Может быть:- Именем кластера, созданного через интерфейс ноутбука.
- HTTP-ссылкой на внутренний IP-адрес хоста
masternode, напримерhttp://10.0.0.8:8998/.
--session— идентификатор вычислительной сессии. Если параметр пропущен, используется сессия кластера Yandex Data Processing по умолчанию.--variables— переменная, импортированная из DataSphere в кластер Yandex Data Processing. Поддерживаемые типы:bool,int,float,str,pandas.DataFrame(преобразовывается в Spark DataFrame в кластере).--return_variables— переменная, которая будет экспортирована из кластера Yandex Data Processing в DataSphere. Поддерживаемые типы:bool,int,float,str,pandas.DataFrame(преобразованный Spark DataFrame).
Пример использования вычислительных сессий с пользовательскими параметрами
Чтобы запустить вычисления в сессии с заданными настройками, сначала создайте сессию, а затем передайте код в ячейке с заголовком #!spark:
-
Создайте сессию и определите ее параметры:
%create_livy_session --cluster my-new-cluster --id ses1 --conf spark.cores.max=4 --conf spark.executor.memory=4g -
В следующей ячейке запустите вычисления:
#!spark --cluster my-new-cluster --session ses1 import random def inside(p): x, y = random.random(), random.random() return x*x + y*y < 1 NUM_SAMPLES = 1_000_000 count = sc.parallelize(range(0, NUM_SAMPLES)) \ .filter(inside).count() print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)) -
Если сессия вам больше не нужна, удалите ее:
%delete_livy_session --cluster my-new-cluster --id ses1
Работа с библиотекой Spark SQL
DataSphere может работать с библиотекой Spark SQL. Например, следующий запрос вернет все записи в таблице animals, созданной в кластере cluster test-dataproc-cluster:
#!spark --cluster test-dataproc-cluster --return_variables df
df = spark.sql("SELECT * FROM animals;")
df
Подробнее о синтаксисе SQL-запросов и работе с библиотекой Spark SQL см. в официальной документации