Spark pods configuration

This page explains how to configure Spark pods to run on Kubernetes and how the Data Mechanics platform can help.

When using Spark on Kubernetes, it is important to size executors so that they make the most use of the resources of the cluster while still fitting on the cluster nodes. Additionally, some precautions must be taken when using preemptible instances (GCP, AWS, Azure) with Spark.

First we'll see how the Data Mechanics platform takes care of those configurations for you. Then we'll explain how to size pods manually, in case you need to do it yourself!

Pod configuration on the Data Mechanics platform

To support workloads with different performance requirements, the Kubernetes cluster on which the platform runs contains several node pools with different instance types. The node pools scale down to zero when the cluster is idle.

The Data Mechanics platform determines the node pools to be used for driver and executor pods based on your input. It ensures that:

  • driver pods are scheduled on non-preemptible instances
  • given your input, pods are sized to fit the smallest nodes available and limit waste

In the rest we'll take the example of a GKE cluster with three node pools. Read on if your're on AWS or Azure: the behavior is the same on an EKS or AKS cluster.

Node pool ANode pool B Node pool C
Instance typen2-standard-2 n2-standard-4n2-standard-8
CPU capacity (cores)248
Memory capacity (GiB)7.514.929.8
Preemptible?NoYesYes

Default configuration

If you run a query without specifying pod sizes, default configurations are used:

  • the driver receives one CPU core on the smallest non-preemptible instance type available
  • the executors are sized to use all resources on the smallest instance type available

For instance if you run this app:

Request without pod sizing
curl --request POST https://<your-cluster-url>/api/apps/ \
-H 'Content-Type: application/json' \
-H 'X-API-Key: <your-user-key>' \
-d '{
"jobName": "sparkpi",
"configOverrides": {
"type": "Scala",
"sparkVersion": "3.0.0",
"mainApplicationFile": "local:///opt/spark/examples/jars/spark-examples_2.12-3.0.0.jar",
"mainClass": "org.apache.spark.examples.SparkPi",
"arguments": [
"10000"
]
}
}'

You can see the resources allocated to the Spark app in the return payload:

Resources provisioned by default
{
"appName": "sparkpi-20200809-193534-ayojd",
"jobName": "sparkpi",
"config": {
"driver": {
"coreRequest": "710m",
"cores": 1,
"memory": "2043m",
...
},
"executor": {
"coreRequest": "1700m",
"cores": 2,
"memory": "4086m",
...
},
...
},
...
}
  • the executors get 2 cores, which fits an instance of node pool A (n2-standard-2)
  • the driver gets 1 core and half the memory of an executor. As a result the driver will be scheduled on an instance of node pool A and take exactly half the available resources (CPU and memory).

If you want to learn more about how the platform sets memory and coreRequest given the number of CPU cores, read about manual pod configuration below.

Selecting the number of cores

To give a hint to the Data Mechanics platform regarding the node pool to be used, set the number of cores for the driver pod or the executor pods.

Continuing our example, if you require 4 cores, the platform understands that other parameters must be used to fit an instance of type n2-standard-4 (node pool B).

Request for 4 cores per executor
curl --request POST https://<your-cluster-url>/api/apps/ \
-H 'Content-Type: application/json' \
-H 'X-API-Key: <your-user-key>' \
-d '{
"jobName": "sparkpi",
"configOverrides": {
"type": "Scala",
"sparkVersion": "3.0.0",
"mainApplicationFile": "local:///opt/spark/examples/jars/spark-examples_2.12-3.0.0.jar",
"mainClass": "org.apache.spark.examples.SparkPi",
"arguments": [
"10000"
],
"executor": {
"cores": 4
}
}
}'
Resources provisioned for 4 cores per executor
{
"appName": "sparkpi-20200809-195124-uyufv",
"jobName": "sparkpi",
"config": {
"driver": {
"coreRequest": "710m",
"cores": 1,
"memory": "2043m",
...
},
"executor": {
"coreRequest": "3400m",
"cores": 4,
"memory": "8173m",
...
},
...
},
...
}

If you require 6 cores, the platform understands that only an instance of node pool C (n2-standard-8) can accommodate such a pod. Other parameters will be set to utilize 75% of the resources of an n2-standard-8 instance. This is the prorata ratio of 6 cores utilized over 8 available.

Request for 6 cores per executor
curl --request POST https://<your-cluster-url>/api/apps/ \
-H 'Content-Type: application/json' \
-H 'X-API-Key: <your-user-key>' \
-d '{
"jobName": "sparkpi",
"configOverrides": {
"type": "Scala",
"sparkVersion": "3.0.0",
"mainApplicationFile": "local:///opt/spark/examples/jars/spark-examples_2.12-3.0.0.jar",
"mainClass": "org.apache.spark.examples.SparkPi",
"arguments": [
"10000"
],
"executor": {
"cores": 6
}
}
}'
Resources provisioned for 6 cores per executor
{
"appName": "sparkpi-20200809-200335-y3vug",
"jobName": "sparkpi",
"config": {
"driver": {
"coreRequest": "710m",
"cores": 1,
"memory": "2043m",
...
},
"executor": {
"coreRequest": "5100m",
"cores": 6,
"memory": "12261m",
...
},
...
},
...
}

If you require more than 8 cores, the platform will throw an error saying that no node pool can accomodate such a large pod.

Request for 10 cores per executor
curl --request POST https://<your-cluster-url>/api/apps/ \
-H 'Content-Type: application/json' \
-H 'X-API-Key: <your-user-key>' \
-d '{
"jobName": "sparkpi",
"configOverrides": {
"type": "Scala",
"sparkVersion": "3.0.0",
"mainApplicationFile": "local:///opt/spark/examples/jars/spark-examples_2.12-3.0.0.jar",
"mainClass": "org.apache.spark.examples.SparkPi",
"arguments": [
"10000"
],
"executor": {
"cores": 10
}
}
}'
The cluster cannot accommodate 10 cores per executor
{
"advice": "You should modify your application configuration to fit at least one available node pool maximum capacities",
"executor": {
"availableNodePools": [
{
"cloudProvider": "gcp",
"cpu": 2,
"instanceName": "n2-standard-2",
"maximumSchedulableCoreRequest": 1.7,
"maximumSchedulableMemoryRequestMib": 4086,
"memoryMib": 7629,
"preemptible": false
},
{
"cloudProvider": "gcp",
"cpu": 4,
"instanceName": "n2-standard-4",
"maximumSchedulableCoreRequest": 3.4,
"maximumSchedulableMemoryRequestMib": 8173,
"memoryMib": 15258,
"preemptible": true
},
{
"cloudProvider": "gcp",
"cpu": 8,
"instanceName": "n2-standard-8",
"maximumSchedulableCoreRequest": 6.8,
"maximumSchedulableMemoryRequestMib": 16348,
"memoryMib": 30517,
"preemptible": true
}
],
"targetInputConfig": {
"cores": 10
}
},
"message": "Unschedulable pods because requested resources are above available node pools capacities"
}

If you want to learn more about how the platform sets memory and coreRequest from the number of CPU cores, read on the next section about manual pod configuration.

Manual pod configuration

In this section, we explain how to size pods manually so that you understand how the Data Mechanics platform does it for you. Additionally, it provides a good starting point in case you need to do it yourself.

For both drivers and executors, the parameters controlling the pod sizes are:

  • cores. This is the number of tasks run in parallel. It is also the default value for coreRequest when not specified.
  • coreRequest. This is the number of CPU cores requested by Spark from Kubernetes
  • memory. This is the memory available to Spark

In the rest, we explain how to set those parameters so as to minimize wasted capacity and avoid unschedulable pods.

Wasted capacity on nodes

Imagine that you have m5.xlarge nodes on a cluster deployed on EKS. These instances have 4 cores and 16GiB of memory each.

Suppose that you have set the following size for your executor pods:

cores: 3
memory: "10g"

In this case, it is impossible for Kubernetes to schedule more than one executor per node. So every executor creation causes a new instance to be launched.

As a result, if your Spark app has 10 executors, Spark is using 30 cores when Kubernetes runs 10 m5.xlarge instances, which is 40 cores. 25% of the capacity is lost!

Unschedulable pods

Let's continue with the example of m5.xlarge instances.

To make full use of your cluster, you might be tempted to set the following size for your executor pods:

cores: 4
memory: "16g"

Kubernetes won't be able to schedule executor pods on the cluster with these settings.

The issue is that Kubernetes reserves some capacity on each node for the system daemons. Only the rest of the capacity, dubbed allocatable capacity, is available to Spark pods.

In our example, Kubernetes cannot allocate all 4 cores to a pod.

Good rules of thumb for the non-allocatable capacity are:

  • up to 25% of memory can be reserved by Kubernetes
  • up to 5% of CPU can be reserved by Kubernetes

One executor per node

This is a heuristic you can follow to have one executor per node and maximize the resource utilization of your cluster by executor pods.

cores: <number of cores in a node>
coreRequest: <number of cores in a node - 15%>
memory: <(memory in a node - 25%) / 1.4>

On a m5.xlarge (4 cores and 16GiB), this translates to the follow settings:

cores: 4
coreRequest: "3400m"
memory: "8.5g"

Here's the rationale behind the formulas:

  • when cores and coreRequest are both set, cores control the number of parallel tasks that a Spark executor runs, and coreRequest the actual amount of CPU requested from Kubernetes. Since a Spark executor pod almost never uses 100% of its resources, this works well.
  • The 15% of CPU account for 5% non-allocatable capacity and 10% of extra capacity for daemonsets.
  • The 25% of memory simply account for non-allocatable capacity.
  • We divide the amount of memory requested by 1.4 because Spark requests an extra 40% of memory from Kubernetes. This is controlled by the parameter memoryOverhead, but we do not advise to change this internal parameter of Spark.

Note that when you leave pod sizing to Data Mechanics, finer-grained heuristics are used. They take into account cloud providers specifics to maximize resource usage by Spark pods.

To go further, the page Running Spark on Kubernetes contains some useful information.