In this blog, we have detailed the approach of how to use Spark on Kubernetes and also a brief comparison between various cluster managers available for Spark.
Spark
Spark is a general-purpose distributed data processing engine designed for fast computation. The main feature of Spark is its in-memory cluster computing that increases the processing speed of an application. It supports workloads such as batch applications, iterative algorithms, interactive queries and streaming.
During execution, it creates the following components:
To manage these components, there is a cluster manager that takes care of resource allocation. Following are the various options available for cluster manager:
Standalone Cluster Manager
To use Spark Standalone Cluster manager and execute code, there is no default high availability mode available, so we need additional components like Zookeeper installed and configured.
Hadoop YARN
YARN (Yet Another Resource Negotiator) focuses on distributing MapReduce workloads and is majorly used for Spark workloads.
Apache Mesos
The Mesos kernel runs on every machine and provides applications with APIs for resource management, scheduling across the entire datacenter, and cloud environments. It provides a cluster manager which can execute the Spark code.
Kubernetes
It uses the kube-api server as a cluster manager and handles execution.
Brief introduction Kubernetes and its components
Kubernetes is a container orchestration engine which ensures there is always a high availability of resources.
Apart from that it also has below features.
Architecture of Kubernetes
The architecture of Kubernetes has 2 major components, they are:
1. Master components
All the requests from the user using API, kubectl are sent to master component that is the API Server
2. Node components
The following are the components that are come under the nodes:
Other components
Kubectl: a utility used to communicate with the Kubernetes cluster
Below is the pictorial representation of spark-submit to API server.
We can use spark-submit directly to submit a Spark application to a Kubernetes cluster. Once submitted, the following events occur:
There are 2 options available for executing Spark on a EKS cluster:
Below are the prerequisites for executing spark-submit using:
A. Creating Docker image for Java and PySpark execution
docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .
Once this image is built, it can be used as a base image for the other code execution.
Docker image creation for PySpark code execution:
In this path: spark/kubernetes/dockerfiles/spark/bindings/python there is a ready Docker file which will be used for PySpark execution.
Ensure that you are in the Spark directory as it needs jars and other binaries to be copied so it will use all the directories as context.
docker build -t sparkpy:latest -f \
kubernetes/dockerfiles/spark/kubernetes/dockerfiles/spark/bindings/python/Dockerfile .
B. Creating Kubernetes service account and cluster-role binding
Components that will be used:
Create a yaml file with below contents
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark-sa
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
namespace: default
name: spark-role
rules:
- apiGroups: [""]
resources: ["pods", "services", "configmaps" ]
verbs: ["create", "get", "watch", "list", "post", "delete" ]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: spark-role-binding
subjects:
- kind: ServiceAccount
name: spark-sa
roleRef:
kind: ClusterRole
name: spark-role
apiGroup: rbac.authorization.k8s.io
kubectl apply -f rbac.yaml
Getting cluster information:
kubectl cluster-info
This command gives the master url it and it will look as shown below
Kubernetes master is running at
https://ABCDZZZZZZZZZZZZZZZ.sk1.region.eks.amazonaws.com
Make a note of this url
C. Executing Spark-submit:
Now go to the directory which has Spark binary and use the below command:
./spark-submit
--deploy-mode cluster
--master <master_url>
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa
--name spark-pi
--conf spark.executor.instances=5
--conf spark.kubernetes.driver.container.image=<docker image with code>
--conf spark.kubernetes.executor.container.image=<docker image with code>
local:///opt/spark/examples/src/main/python/pi.py
The driver and executor will be created with whatever name is specified as app name. Using the driver pod we can view logs, access url below commands can be used for it.
To view logs
kubectl logs -f spark-pi-1581014997671-driver
To view spark ui
kubectl port-forward (driver-pod-name) 4040:4040
Post execution, the drivers will be available based on the execution state, until it is deleted manually.
Additional useful options that can be used with Spark-submit
Environment Variables:
When the script requires any environment variable that need to be passed, it can be done using Kubernetes secret and referred to it. Details of achieving this are given below.
“spark.kubernetes.executor.secretKeyRef.DB_PASS”: “snowsec:db_pass”,
Create a secret with the name snowsec and in that db_pass is the key and which will be referred to the spark environment using DB_PASS.
Adding labels to the pod:
When we want to add additional labels to pod we can use below options
Using node affinity:
We can control the scheduling of pods on nodes using selector for which options are available in Spark that is
spark.kubernetes.node.selector.[labelKey]
Operators
Operator is a method of packaging, deploying and managing a Kubernetes application. Kubernetes application is one that is both deployed on Kubernetes, managed using the Kubernetes APIs and kubectl tooling.
Using Spark operator on Kubernetes
Official link https://operatorhub.io/operator/spark-gcp
Installing Kubernetes operator on EKS:
Prerequisites:
helm version –short
helm install incubator/sparkoperator
--name sparkv24
--namespace default
--set sparkJobNamespace=default
--set enableWebhook=true
Verifying Installation:
helm ls
Once applied, the following components will be created:
Here we need the job in yaml. Once we have yaml file, we can submit the job using the below command:
kubectl apply -f
The Yaml file looks as follows:
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi-jar
namespace: default
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v2.4.4"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
sparkVersion: "2.4.4"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 2.4.4
serviceAccount: sparkv24-spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 2.4.4
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
Once we submit the job, it will create 2 pods:
We can verify these using
kubectl get pods
Logs by using below command
kubectl logs -f pod_name
Details of submitted job using below command
kubectl describe sparkapplication
Contains details about Web-UI, service and events that occurred during creation.
Access Web UI:
To access the Web UI for a long running job can be done using port forwarding, using the below mentioned command
kubectl port-forward (driver-pod-name) 4040:4040
Key considerations for Production Spark code on Kubernetes
1. Cost-Effective
No requirement of up and running infrastructure to use Spark on EKS.
2. Build and Deployments
As we deploy the Docker image with the Spark submit, so when we have code changes we need to pass the docker image with Spark submit. These images can be tagged to track the changes.
3. Ideal Use Cases
When workload is less (e.g. 8-10 hr job executions per day) and as batch processing.
4. Availability/Fault Tolerance
Kubernetes has the scheduler which manages the pods created as driver and executor. This enables the usage of pods based on resource availability. Quotas for a namespace can be assigned for better resource management.
5. Resource Tracking
We have used node selectors in Spark submit which allows us to run specific workloads on a specific node. This in turn allows us to track the usage of resources.
6. Monitoring
We have integrated Spark workloads monitoring with Prometheus and Grafana, by using Kube-state-metrics and creating a dashboard. We do a Spark submit by assigning pod labels which allow us to create custom dashboards for specific labels.
7. Logging
We have used the ELK stack for visualizing the logs.
The approach we have detailed is suitable for pipelines which use spark as a containerized service. It also ensures optimal utilization of all the resources as there is no requirement for any component, up and running before doing Spark-submit. Additionally, Spark can utilize features like namespace, quotas along with other features of Kubernetes.
Ajaykumar Baljoshi is a Senior Devops Engineer at Sigmoid, who currently works on Containerization, Kubernetes, DevOps and Infrastructure as Code.
Previously published at https://www.sigmoid.com/blogs/containerization-of-pyspark-using-kubernetes/