Computing on Apache Spark™ clusters
To use Yandex Data Processing clusters, pre-configure a project. To learn more, see the concept.
Regardless of the deployment option, all Yandex Data Processing clusters are charged based on the Yandex Data Processing pricing policy. To view all the clusters available in your project, navigate to Project resources ⟶ Yandex Data Processing on the project page.
Warning
As a user of a cluster deployed in Yandex Data Processing, you manage its lifecycle yourself. Even with no computations ongoing, the cluster will not be deleted and will remain billable.
DataSphere supports using Yandex Data Processing clusters via:
Spark connectors
A Spark connector is a special resource that stores connection settings for Yandex Data Processing clusters. Cluster connection settings are specified when creating a Spark connector. The clusters you select are either connected or created when you run computations in a cell.
You can publish a Spark connector in a community, thus making it available for other projects. Changes to a Spark connector's settings will apply to all the projects the connector is used in.
For correct integration with DataSphere via a Spark connector, make sure that the image version of the deployed Yandex Data Processing cluster is at least 2.0
, with LIVY
, SPARK
, and YARN
enabled.
To learn more about using Spark connectors, see this guide.
Running Python code in a cluster
-
Enter Python code in the cell, for example:
import random def inside(p): x, y = random.random(), random.random() return x*x + y*y < 1 NUM_SAMPLES = 1_000_000_0 count = sc.parallelize(range(0, NUM_SAMPLES)).filter(inside).count() print("Pi is roughly %f" % (4.0 * count / NUM_SAMPL))
-
Select the cell with the code and run your computations.
-
In the Notebook VM configurations window that opens, go to the With Yandex Data Processing cluster tab.
-
Select a computing resource configuration and a Spark connector.
-
Click Select.
The connector and environment configuration are specified manually when you run a computation for the first time. All subsequent computations in this notebook will be performed on the VM created during the first run. A Yandex Data Processing cluster connects to it using the Spark connector.
Syncing Python environment with a cluster
When working with Python Spark via DataSphere, there is no need to manually migrate the virtual environment. In a Yandex Data Processing cluster, you can change the basic composition of PyPI packages using a virtual environment:
-
Install the
catboost
library:%pip install catboost
-
Once the installation is complete, from the top panel, select Kernel ⟶ Restart kernel.... If the installation completes without errors, the virtual environment will be automatically created and available in the Spark session using the
spark
variable.
To synchronize the environment, the Spark connector settings in the S3 settings section must specify the static access key ID for the bucket and the secret containing the static access key.
Warning
Python environment synchronization is running in test mode. To enable environment synchronization, in the Spark connector settings, under Spark settings, specify the .options
= venv
parameter.
Livy sessions
For correct integration with DataSphere via Livy sessions, make sure the image version of the deployed Yandex Data Processing cluster is at least 2.0
, with LIVY
, SPARK
, YARN
, and HDFS
enabled.
Note
To get more than 100 MB of the Yandex Data Processing cluster data, use an S3 connector.
Computing sessions
In Yandex Data Processing clusters, your code is running in sessions
Use the following commands to manage sessions:
%create_livy_session --cluster <cluster_name> --id <session_ID>
to create a session.%delete_livy_session --cluster <cluster_name> --id <session_ID>
to delete a session.
For example, the command below creates a session named ses1
in my-new-cluster
, which allows each process to use a maximum of 4 CPUs per cluster and 4 GB of RAM (for more information, see the Spark documentation
%create_livy_session --cluster my-new-cluster --id ses1 --conf spark.cores.max=4 --conf spark.executor.memory=4g
Dynamic resource allocation is enabled in sessions by default. To limit session resources, set --conf spark.dynamicAllocation.enabled
to false
.
Livy session parameters
Complete list of parameters for the %create_livy_session
command:
Parameter | Type | Description |
---|---|---|
--cluster |
string |
Yandex Data Processing cluster ID or name |
--id |
string |
Session ID, arbitrary string. If not specified, it is generated automatically. |
--conf |
string |
Spark configuration properties |
--proxyUser |
string |
Yandex Data Processing cluster OS username on behalf of which the job will be run. The default value is spark . |
--jars |
string |
Java libraries to be used in the session |
--files |
string |
Files to be used in the session |
--pyFiles |
string |
Python files to be used in the session |
--driverMemory |
string |
Driver memory capacity |
--driverCores |
int |
Number of driver cores |
--executorMemory |
string |
Worker memory capacity |
--executorCores |
int |
Number of worker cores |
--numExecutors |
int |
Number of workers |
--archives |
string |
Archives to be used in the session |
--queue |
string |
YARN queue name |
--variables |
string |
Variables to be used in the session |
--return_variables |
string |
Variables to be returned by the session |
--heartbeatTimeoutInSecond |
int |
Timeout before the session ends |
--ttl |
string |
Inactive session timeout |
For more information about livy session parameters, see the official documentation
Yandex Data Processing session restrictions
DataSphere uses system variables to run Yandex Data Processing clusters. Do not override the values of the following variables:
sc
spark
HiveContext
StreamingContext
SqlContext
The following global Spark configurations are overridden by the parameters required to run Livy jobs:
spark.jars
spark.submit.deployMode
spark.yarn.dist.archives
spark.submit.pyFiles
spark.yarn.maxAppAttempts
spark.yarn.submit.waitAppCompletion
To specify additional libraries for a Spark session, use the spark.driver.extraClassPath
and spark.executor.extraClassPath
parameters and place the libraries on all nodes when creating a Yandex Data Processing cluster with initialization scripts. Make sure the paths to the libraries you use are the same on all cluster nodes.
Running Python code in a cluster
The code is run in the cells with the header:
#!spark [--cluster <cluster>] [--session <session>] [--variables <input_variable>] [--return_variables <returned_variable>]
Where:
--cluster
: Yandex Data Processing cluster to perform computations on. This can be:- Name of the cluster created through the notebook interface.
- HTTP link to the internal IP address of the
masternode
host, such ashttp://10.0.0.8:8998/
.
--session
: Computing session ID. If this parameter is not specified, the default Yandex Data Processing cluster session is used.--variables
: Variable imported to the DataSphere cluster from Yandex Data Processing. Supported types includebool
,int
,float
,str
, andpandas.DataFrame
(converted to Spark DataFrame in a cluster).--return_variables
: Variable to be exported from the Yandex Data Processing cluster to DataSphere. Supported type:bool
,int
,float
,str
,pandas.DataFrame
(transformed Spark DataFrame).
Example of using computing sessions with user-defined parameters
To run computations in a session with defined settings, first create this session and then provide the code in the cell with the #!spark
header:
-
Create a session and set its parameters:
%create_livy_session --cluster my-new-cluster --id ses1 --conf spark.cores.max=4 --conf spark.executor.memory=4g
-
In the next cell, run your computations:
#!spark --cluster my-new-cluster --session ses1 import random def inside(p): x, y = random.random(), random.random() return x*x + y*y < 1 NUM_SAMPLES = 1_000_000 count = sc.parallelize(range(0, NUM_SAMPLES)) \ .filter(inside).count() print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
-
If you no longer need the session, delete it:
%delete_livy_session --cluster my-new-cluster --id ses1
Working with the Spark SQL library
DataSphere supports using the Spark SQL library. For example, the query below will return all records in the animals
table created in cluster test-dataproc-cluster
:
#!spark --cluster test-dataproc-cluster --return_variables df
df = spark.sql("SELECT * FROM animals;")
df
For more information about the SQL query syntax and how to use the Spark SQL library, see the official documentation