Spark jobs
Yandex Data Processing supports Spark
Application management
A single cluster may run multiple applications concurrently. A running application is orchestrated by a special driver program. To learn how Spark drivers work, see the Resource allocation section.
An application can either be in standby mode or run jobs. By default, jobs within an application are run on the first in first out (FIFO) basis. This method does not require any additional setup.
To enable concurrent jobs, switch the scheduler to the FAIR mode and additionally set up the system and specific jobs. To learn how to do this, see the Apache Spark documentation
Tip
To run concurrent jobs without FAIR scheduling, you can run them in different Spark applications in the FIFO mode.
Task management
Each Spark job consists of multiple tasks run by specific processes called executors. Each executor is run on a single cluster host and consumes a certain amount of computing resources, such as CPU and RAM.
Depending on the execution plan, tasks can be run either one by one or concurrently. Concurrent tasks are grouped into stages. The number of tasks that can be running concurrently depends on requests and available cluster resources.
If you use standard Yandex Data Processing settings, computing resources required to run Spark jobs are allocated dynamically.
Resource allocation
A Spark application running on a Yandex Data Processing cluster includes a number of processes running on the cluster nodes. Contents of the application's processes, their location, and the amount of the computing resources reserved depend on the Spark component properties.
Spark applications managed by the YARN resource manager support two possible deploy modes set by the spark:spark.submit.deployMode
property:
- In the
deployMode=cluster
mode, the driver runs in the application's main process managed by YARN (YARN Application Master process), so the client can finish operation once the application launches successfully. - In the
deployMode=client
mode, the driver runs directly in the client process, while the YARN Application Master is used only for working with YARN (e.g., to get resources).
In both modes, the YARN Application Master runs on a compute host within a compute
or data
subcluster.
If the cluster meets the requirements for lightweight clusters, the deployMode=client
mode is used by default. Otherwise, the default mode is deployMode=cluster
.
Resources are allocated between the driver and the executors based on the Spark component properties. The tables below list the key properties that determine resource allocation in different deploy modes.
These tables use the following abbreviations:
-
allCPU
: Number of host cores. This property is determined by the host class selected while creating the subcluster. -
nmMem
: Amount of the host RAM available to YARN NodeManager. This property is calculated using the following formula:total host RAM
×fraction of RAM allocated for YARN NodeManager
- The total host RAM is determined by the host class selected while creating the subcluster.
- The fraction of RAM allocated for YARN NodeManager can be set in the
dataproc:nodemanager_available_memory_ratio
property. By default, it is set to0.8
. The remaining RAM is reserved for auxiliary load (sending logs and metrics, file system cache, etc.).
In the tables, results of arithmetic operations are rounded:
- For CPU, results are rounded down to the nearest integer. If the result of a division is less than 1, it is rounded up to 1.
- For RAM, results are rounded down to the nearest multiple of 1 GB.
In this mode, the driver runs on the cluster's master host separately from the YARN Application Master and can access all resources of the master host. The YARN Application Master runs in a separate YARN container on a compute host. However, only the required small amount of resources is reserved for its operation.
Parameter (abbreviation) |
Description |
Default value |
|
Maximum number of containers per compute host |
|
|
Number of processor cores allocated for the YARN Application Master |
|
|
Amount of RAM (MB) allocated for the YARN Application Master |
|
|
Number of processor cores allocated to each executor |
( |
|
Amount of RAM (MB) allocated to each executor |
( |
Since yamCPU
and yamMem
are subtracted from the total CPU and RAM, respectively, the YARN Application Master consumes less resources than a standard container, and the amount of resources for the executor increases.
This mode assumes that a resource-intensive program, e.g., HDFS, is running on the cluster's master host. Consequently, the drivers run on compute hosts within the YARN Application Master and are allocated a substantial amount of resources.
Property (abbreviation) |
Description |
Default value |
|
Fraction of compute host RAM reserved for the driver |
|
|
Maximum number of containers per compute host |
|
|
Number of processor cores allocated to each executor |
|
|
Amount of RAM (MB) allocated to each executor |
|
|
Number of processor cores allocated for the YARN Application Master |
|
|
Amount of RAM (MB) allocated for the YARN Application Master |
|
The default values set in the service are optimal for running a single app. To optimize resource allocation for your needs, change the driver deployment mode and other component properties:
-
For all new jobs in a cluster:
- When creating a cluster.
- When modifying a cluster.
-
For an individual job, during its creation.
Resource allocation examples
A single app runs on a cluster with default settings and two compute hosts. In this case:
- The driver can take up all the master host's resources.
- The amount of resources available for executors on all compute hosts will be reduced by the amount reserved for the YARN Application Master.
- The resources reserved for the YARN Application Master on the second host will remain unused.
- The master host may be running HDFS or another resource-intensive program.
- The YARN Application Master and the driver will take up a substantial portion of the resources on one of the compute hosts, but no more than the size of the container for the executors. Because of this, some resources may remain unused.
- On the second compute host, both containers will be allocated to the executors.
Automatic scaling
Automatic scaling in Yandex Data Processing clusters allows you to automatically manage computing resources of the created cluster by adding more nodes to the cluster when you have pending operations in the execution queue or by releasing nodes (deleting the corresponding VMs) when the cluster load is lower.
When the load drops below a specified threshold, the automatic scaling function deletes one or multiple cluster nodes. These nodes may host processes of Spark jobs. For stable operation of Apache Spark jobs in clusters with automatic scaling, you must meet the following conditions:
- Repeated executions must be permitted for operations within a Spark job.
- Processes of the YARN Application Master and Spark job drivers must be located on hosts without automatic scaling (i.e., not to be automatically decommissioned).
To configure repeated executions of operations, you need to set the spark:spark.task.maxFailures
parameter at the job level or for the entire cluster. This parameter defines the number of errors which causes the entire job to terminate abnormally when a certain operation reaches it. We recommend using the default value, 4
, in which case three attempts are possible when errors occur consecutively on a certain operation within the job.
Forced termination of YARN Application Master processes and Spark job driver processes results in immediate abnormal termination of the entire job, so you should locate these processes on hosts without automatic scaling. HDFS nodes (a subcluster of nodes with the Data Node role) or an additional subcluster of compute nodes (with the Compute Node role) with disabled automatic scaling can serve as such nodes.
YARN settings for automatic scaling
You can use node labels set using the YARN resource manager tools together with Hadoop Capacity Schedulerspark.yarn.am.nodeLabelExpression
parameter must contain the name of the created label, and the cluster must have at least one node with this label.
Below is an example of the initialization script for analyzing the subcluster name and setting a YARN label to nodes of such subcluster. The script accepts one command line argument, the name of the subcluster to label.
#! /bin/sh
set -e
set +u
NAME_TO_LABEL="$1"
# Loading the JQ and YQ utilities to analyze the VM metadata.
wget -q -O - https://mycop1.website.yandexcloud.net/utils/jq.gz | gzip -dc >/tmp/jq
wget -q -O - https://mycop1.website.yandexcloud.net/utils/yq.gz | gzip -dc >/tmp/yq
mv /tmp/jq /usr/local/bin/
mv /tmp/yq /usr/local/bin/
chown root:bin /usr/local/bin/jq /usr/local/bin/yq
chmod 555 /usr/local/bin/jq /usr/local/bin/yq
# Checking the requirement to set labels.
MUSTLABEL=N
if [ ! -z "$NAME_TO_LABEL" ]; then
# Getting the subcluster name from the VM metadata.
curl -H Metadata-Flavor:Google 'http://169.254.169.254/computeMetadata/v1/instance/?alt=json&recursive=true' -o /tmp/host-meta.json
jq -r .attributes.'"user-data"' /tmp/host-meta.json >/tmp/host-userdata.yaml
subcid=`jq -r '.vendor.labels.subcluster_id' /tmp/host-meta.json`
subname=`yq -r ".data.topology.subclusters.${subcid}.name" /tmp/host-userdata.yaml`
if [ "${subname}" = "${NAME_TO_LABEL}" ]; then
# The subcluster name matches the specified one.
MUSTLABEL=Y
fi
fi
if [ "${MUSTLABEL}" = "Y" ]; then
MYHOST=`hostname -f`
while true; do
# Checking whether the label exists in the cluster.
foundya=`sudo -u yarn yarn cluster --list-node-labels 2>/dev/null | grep -E '^Node Labels' | grep '<SPARKAM:' | wc -l | (read x && echo $x)`
if [ $foundya -gt 0 ]; then
break
fi
# If the label does not exist, creating it at the cluster level.
sudo -u yarn yarn rmadmin -addToClusterNodeLabels 'SPARKAM' || true
sleep 2
done
# Setting the label to the current node.
sudo -u yarn yarn rmadmin -replaceLabelsOnNode "${MYHOST}=SPARKAM"
fi
Spark settings for automatic scaling
The table below provides the typical Spark parameters for a Yandex Data Processing cluster with automatic scaling.
Parameter=value |
Usage description |
|
Number of possible consecutive errors occuring on a ceratin step of the Spark job |
|
Name of the label for selecting nodes to launch AM containers of Spark jobs |
|
Default mode of running Spark jobs that is used when the respective setting is missing at the job level or for a specific service |
|
Spark job running mode used in Apache Livy sessions |
|
Enabling support for node labels in YARN |
|
Folder storing node labels in the file system of the cluster master node |
|
Mode of managing YARN node labels |
|
Maximum share of resources (from |
|
Allowing jobs in the |
|
Setting the allowed usage percentage for nodes with the |
|
Setting the allowed usage percentage for nodes with the |
Useful links
The Apache Spark documentation provides detailed information about:
What else you can find in the Apache Hadoop documentation:
To check how Spark applications are running in a Yandex Data Processing cluster, run Spark application monitoring.
If a Spark application is slow, run primary performance diagnostics.