Работа с коннекторами Spark
Интеграция DataSphere Jobs с Yandex Data Processing через коннекторы Spark отличается от стандартной работы заданий. Кластеры Yandex Data Processing имеют строгие требования к окружению, поэтому для заданий есть ряд ограничений:
- Для заданий, использующих коннектор Spark, нельзя изменить Docker-образ. Попытка указать новый образ приведет к ошибке.
- Python-окружение DataSphere Jobs должно соответствовать окружению кластера Yandex Data Processing, версии Python и основных библиотек должны совпадать. Для работы с Yandex Data Processing из заданий предпочтительнее использовать ручное задание.
- Для корректного исполнения задания могут потребоваться дополнительные пакеты. Список пакетов и их версии зависят от версии кластера Yandex Data Processing. Информация о дополнительных пакетах отражается в логе задания.
- Для подключения к кластеру Yandex Data Processing из задания необходимо использовать DataSphere SDK из Python-пакета
datasphere
.
Важно
Задания не поддерживают работу с временными кластерами.
Настройка задания
Для работы с коннекторами Spark необходимо добавить в файл конфигурации задания поле c идентификатором коннектора.
spark:
connector: <идентификатор_коннектора>
Убедитесь, что коннектор Spark доступен в проекте.
Важно
Для работы с коннекторами Spark в DataSphere Jobs требуется DataSphere CLI версии 0.10.0
или выше.
Подключение к кластеру Yandex Data Processing из кода задания
Для подключения к кластеру Yandex Data Processing из задания используется DataSphere SDK из Python-пакета datasphere
. Укажите пакет datasphere
в requirements.txt
и подключитесь к кластеру в коде.
from datasphere.sdk import SDK
sdk = SDK()
spark_wrapper = sdk.connect_to_spark() # Конфигурирует и создает сессию подключения к кластеру Yandex Data Processing
spark_session = spark_wrapper.spark # Сессия Spark
spark_context = spark_wrapper.sc # Контекст Spark
# Далее можно использовать spark_session и spark_context так же, как при работе c pyspark
Локальная отладка задания
При работе с кластерами Yandex Data Processing в DataSphere SDK предусмотрена возможность создания локальной сессии PySparksdk.connect_to_spark()
.
Для запуска локальной сессии создайте файл в директории задания с именем .spark_local_config.json
(или укажите путь до файла через переменную окружения JOB_DATAPROC_CONNECTION_CONFIG
) и укажите в нем следующие параметры:
{
"master_node_host": "localhost",
"connection_mode": "SPARK_LOCAL", // Обязательно для локальной отладки
"session_params": {} // Параметры сессии PySpark
}
Пример задания с подключением к кластеру Yandex Data Processing
-
Создайте файл конфигурации задания
config.yaml
:name: job-spark-connector-example cmd: python main.py env: python: type: manual version: 3.8 requirements-file: requirements.txt spark: connector: <идентификатор_коннектора>
-
Создайте файл с параметрами окружения
requirements.txt
:datasphere==0.10.0
-
Создайте файл точки входа в задание
main.py
:import random from datasphere.sdk import SDK sdk = SDK() spark_wrapper = sdk.connect_to_spark() spark_context = spark_wrapper.sc NUM_SAMPLES = 10_000_000 def inside(*args, **kwargs): x, y = random.random(), random.random() return x * x + y * y < 1 count = spark_context.parallelize(range(0, NUM_SAMPLES)).filter(inside).count() print("Pi:", 4.0 * count / NUM_SAMPLES)