Computing on Apache Spark™ clusters in DataSphere
Yandex Data Proc allows you to deploy Apache Spark™ clusters. You can use Yandex Data Proc clusters to run distributed training on them. DataSphere supports sessions created by Apache Livy
Cluster deployment options
There are two ways to deploy a cluster for computations in DataSphere:
- Create a cluster in DataSphere using a special resource called a Yandex Data Proc template.
- Create a cluster in Yandex Data Proc on your own and integrate it into your DataSphere project.
Regardless of the deployment option, all Yandex Data Proc clusters are charged based on the Yandex Data Proc pricing policy. To view all the clusters available in your project, open Project resources ⟶ Data Proc on the project page.
Yandex Data Proc templates
In a Yandex Data Proc template, you select one of the preset cluster configurations. Based on the Yandex Data Proc template activated in the project, DataSphere deploys a temporary cluster using the appropriate project parameters.
DataSphere monitors how temporary clusters are running. If the cluster is not used for computations during two hours, DataSphere will stop it. You can redeploy the cluster in your project as needed. You can also share Yandex Data Proc templates with other users.
You can learn more about Yandex Data Proc templates here.
Integration with Yandex Data Proc
If you have experience using Yandex Data Proc or the standard template configurations do not suit your needs, you can deploy a cluster and use it for computing in DataSphere.
Warning
Using a cluster deployed in Yandex Data Proc, you will need to manage its lifecycle on your own. Even if there have been no computations for more than two hours, the cluster will be running and you will pay for it until you stop it.
For proper integration with DataSphere, make sure the image version of the deployed Yandex Data Proc cluster is at least 1.3
and has the LIVY
, SPARK
, YARN
, and HDFS
services enabled.
Note
To get more than 100 MB of the Yandex Data Proc cluster data, use an S3 connector.
Setting up a DataSphere project to work with Yandex Data Proc clusters
To use Yandex Data Proc clusters, set the following project parameters:
-
Default folder to enable integration with other Yandex Cloud services. A Yandex Data Proc cluster will be deployed in this folder based on the current cloud quotas. A fee for using the cluster will be debited from your cloud billing account.
-
Service account to be used by DataSphere for creating and managing clusters. The service account needs the following roles:
dataproc.agent
to use Yandex Data Proc clusters.dataproc.admin
to create clusters from Yandex Data Proc templates.vpc.user
to use the Yandex Data Proc cluster network.iam.serviceAccounts.user
to create resources in the folder on behalf of the service account.
-
Subnet for DataSphere to communicate with the Yandex Data Proc cluster. Since the Yandex Data Proc cluster needs to access the internet, make sure to configure a NAT gateway in the subnet.
Note
If you specified a subnet in the project settings, the time to allocate computing resources may be increased.
Computing sessions
In Yandex Data Proc clusters, your code is running in sessions
Use the following commands to manage sessions:
%create_livy_session --cluster <cluster_name> --id <session_ID>
to create a session.%delete_livy_session --cluster <cluster_name> --id <session_ID>
to delete a session.
For example, the command below creates a session named ses1
in my-new-cluster
, which allows each process to use a maximum of 4 CPUs per cluster and 4 GB of RAM (for more information, see the Spark documentation
%create_livy_session --cluster my-new-cluster --id ses1 --conf spark.cores.max=4 --conf spark.executor.memory=4g
Dynamic resource allocation is enabled in sessions by default. To limit session resources, set --conf spark.dynamicAllocation.enabled
to false
.
Livy session parameters
Complete list of parameters for the %create_livy_session
command:
Parameter | Type | Description |
---|---|---|
--cluster |
string |
Yandex Data Proc cluster ID or name |
--id |
string |
Session ID, arbitrary string. If omitted, it is generated automatically. |
--conf |
string |
Spark configuration properties |
--proxyUser |
string |
Yandex Data Proc cluster OS username on behalf of which the job will be run. The default value is spark . |
--jars |
string |
Java libraries to be used in the session |
--files |
string |
Files to be used in the session |
--pyFiles |
string |
Python files to be used in the session |
--driverMemory |
string |
Driver memory capacity |
--driverCores |
int |
Number of driver cores |
--executorMemory |
string |
Worker memory capacity |
--executorCores |
int |
Number of worker cores |
--numExecutors |
int |
Number of workers |
--archives |
string |
Archives to be used in the session |
--queue |
string |
YARN queue name |
--variables |
string |
Variables to be used in the session |
--return_variables |
string |
Variables to be returned by the session |
--heartbeatTimeoutInSecond |
int |
Timeout before the session ends |
--ttl |
string |
Inactive session timeout |
For more information about livy session parameters, see the official documentation
Yandex Data Proc session restrictions
DataSphere uses system variables to run Yandex Data Proc clusters. Do not override the values of the following variables:
sc
spark
HiveContext
StreamingContext
SqlContext
The following global Spark configurations are overridden by the parameters required to run Livy jobs:
spark.jars
spark.submit.deployMode
spark.yarn.dist.archives
spark.submit.pyFiles
spark.yarn.maxAppAttempts
spark.yarn.submit.waitAppCompletion
To specify additional libraries for a Spark session, use the spark.driver.extraClassPath
and spark.executor.extraClassPath
parameters and place the libraries on all nodes when creating a Yandex Data Proc cluster with initialization scripts. Make sure the paths to the libraries you use are the same on all cluster nodes.
Running Python code in a cluster
The code is run in the cells with the header:
#!spark [--cluster <cluster>] [--session <session>] [--variables <input_variable>] [--return_variables <returned_variable>]
Where:
--cluster
: Yandex Data Proc cluster computations will be performed on. This can be:- Name of the cluster created through the notebook interface.
- HTTP link to the internal IP address of the
masternode
host, such ashttp://10.0.0.8:8998/
.
--session
: Computing session ID. If this parameter is not specified, the default Yandex Data Proc cluster session is used.--variables
: Variable imported to the DataSphere cluster from Yandex Data Proc. Supported types includebool
,int
,float
,str
, andpandas.DataFrame
(converted to Spark DataFrame in a cluster).--return_variables
: Variable to be exported from the Yandex Data Proc cluster to DataSphere. Supported types includebool
,int
,float
,str
, andpandas.DataFrame
(converted to Spark DataFrame).
Example of using computing sessions with user-defined parameters
To run computations in a session with defined settings, first create this session and then provide the code in the cell with the #!spark
header:
-
Create a session and set its parameters:
%create_livy_session --cluster my-new-cluster --id ses1 --conf spark.cores.max=4 --conf spark.executor.memory=4g
-
In the next cell, run computations:
#!spark --cluster my-new-cluster --session ses1 import random def inside(p): x, y = random.random(), random.random() return x*x + y*y < 1 NUM_SAMPLES = 1_000_000 count = sc.parallelize(range(0, NUM_SAMPLES)) \ .filter(inside).count() print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
-
If you no longer need the session, delete it:
%delete_livy_session --cluster my-new-cluster --id ses1
Working with the Spark SQL library
DataSphere supports using the Spark SQL library. For example, the query below will return all records in the animals
table created in cluster test-dataproc-cluster
:
#!spark --cluster test-dataproc-cluster --return_variables df
df = spark.sql("SELECT * FROM animals;")
df
For more information about the SQL query syntax and how to use the Spark SQL library, see the official documentation