Working with Spark connectors
Integrating DataSphere Jobs with Yandex Data Processing through Spark connectors is different from the standard way of how jobs work. Yandex Data Processing clusters have strict environment requirements, which implies the following restrictions for jobs:
- You cannot update a Docker image for a job that uses a Spark connector. Trying to specify a new image will lead to an error.
- The Python environment for DataSphere Jobs must be consistent with your Yandex Data Processing cluster environment, i.e., the versions of Python and its main libraries must be the same. To work with Yandex Data Processing using jobs, we recommend setting up your environment manually.
- For a job to run correctly, you may need additional packages. The required packages and their versions depend on the Yandex Data Processing cluster version. You can find information on additional packages in the job log.
- To connect to a Yandex Data Processing cluster from a job, you need to use the DataSphere SDK from the
datasphere
Python package.
Warning
Jobs do not support operations with temporary clusters.
Setting up a job
To work with Spark connectors, add a field with the connector ID to the job configuration file.
spark:
connector: <connector_ID>
Make sure the Spark connector is available in your project.
Warning
To work with Spark connectors in DataSphere Jobs, you need the DataSphere CLI version 0.10.0
or higher.
Connecting to a Yandex Data Processing cluster from the job code
To connect to a Yandex Data Processing cluster from a job, you need the DataSphere SDK from the datasphere
Python package. Specify the datasphere
package in requirements.txt
and connect to the cluster in the code.
from datasphere.sdk import SDK
sdk = SDK()
spark_wrapper = sdk.connect_to_spark() # Configures and creates a Yandex Data Processing cluster connect session
spark_session = spark_wrapper.spark # Spark session
spark_context = spark_wrapper.sc # Spark context
# You can then use `spark_session` and `spark_context` in the same way as with `pyspark`
Debugging a job locally
When working with Yandex Data Processing clusters in the DataSphere SDK, you can create a local PySpark sessionsdk.connect_to_spark()
.
To start a local session, create a .spark_local_config.json
file in the job directory (or provide the path to the file in the JOB_DATAPROC_CONNECTION_CONFIG
environment variable) and specify the following parameters in the file:
{
"master_node_host": "localhost",
"connection_mode": "SPARK_LOCAL", // Required for local debugging
"session_params": {} // PySpark session parameters
}
Example of a job with a connection to a Yandex Data Processing cluster
-
Create the
config.yaml
job configuration file:name: job-spark-connector-example cmd: python main.py env: python: type: manual version: 3.8 requirements-file: requirements.txt spark: connector: <connector_ID>
-
Create the
requirements.txt
file with environment parameters:datasphere==0.10.0
-
Create the
main.py
job entry point file:import random from datasphere.sdk import SDK sdk = SDK() spark_wrapper = sdk.connect_to_spark() spark_context = spark_wrapper.sc NUM_SAMPLES = 10_000_000 def inside(*args, **kwargs): x, y = random.random(), random.random() return x * x + y * y < 1 count = spark_context.parallelize(range(0, NUM_SAMPLES)).filter(inside).count() print("Pi:", 4.0 * count / NUM_SAMPLES)