Spark & Jupyter on K8S — Tech Story

Introduction:

If you were in the data engineering industry for a while you probably get know all the existing notebooks solutions out there. in this article I’m gonna focus on what and how we use it in our Data & M.L platform.

First let talk about us…

Plarium is a leading gaming company in the industry — Plarium’s games aren’t just play-for-fun it’s totally a way of life — with millions of active players every day our games plays top 1# gaming company in RPG & Strategy games. Raid Shadow Legends is just one example of a game that blows the charts every day.

All our games relay on data in gaming industry it’s not just “nice-to-have” solution in the tech stack — with more and more companies competing limited players — our game experience must be at the best we can deliver and the way to know that relay on our data. All of that data is being collect, transformed, analysed and exposed in our Data & M.L platform — that’s what drives our business forward and provide our player the best game experience.

So with that context — let’s start:

First thing we did is to decided our main tech stack solution and for that we need to focus on the goal and requirements of our Data & M.L platform interactive system:

Sure we have lot’s of other requirements and needs but basically that’s the 5 general ideas we keep in mind during our journey. Next thing we need to take in place is our tech stack :

Making it reliable — From VM’s to Containers:

After a long time working with VM’s on the cloud we suffer a-lot from updates, packages deprecation and mismatch with packages needed to be installed — it’s either if you work with Scala and share the JVM class-path or working with python and discover that the latest package update has some braking changes (of corse it happens at Friday 8pm) — So we wanted to avoid it and make our solution based on immutable containers. it’s a tech solution that proved it self and it’s just works.

Making it Scale — Containers & K8s:

Since we were already working with GCP for a long time, basically we had no problem of scaling containers at the cloud — we used Google Kubernetes Engine to create/destroy and update our containers solution.

Making it Schedule — Cloud Composer (Airflow):

We truly believe in solutions as a service, Google Cloud as a platform has ton’s of services that can really make our life easier, especially when we are talking about R&D teams that need to be focused on the business and not over discovering why the hell the process didn’t run today. We choose on going with GCP managed Airflow solution (AKA: Cloud Composer). Composer really makes our life easy after a long time spent on other scheduler systems we found it as the most popular, simple (infra as a code) and stable solution. also when choosing to go with Composer we got for “free”: containerized Airflow, Authentication , GCP plugins and many more

Making it a distributed — Apache Spark :

We where using Apache Spark for a while — All our pipelines are built using Spark and it play’s big part of our ability to process huge amounts of data in both streaming and batch. So as a starting point we wanted to bring this technology to our interactive solution. — the last release (v3.0.0) of spark was a game changer for us and we will discuss it a bit later.

JupyterHub — Connecting the Dots

So after we clarified what tech stack we want to use we start investigation solutions out there in the open-source forest. The first thing we spot out there is JupyterHub — a solution provided by the Jupyter community for multi-user environment — the best part of it is the awesome documentation of how to get things done in the cloud with our stack at GCP Kubernetes: https://zero-to-jupyterhub.readthedocs.io/en/latest/. So as a stating point we created the cluster and managed to launch the JupyterHub part, but that’s where things come’s to a point we need to go deeper into the code.

So to support Apache Spark in Jupyter we get into the forest again, and start investigating solutions that will give us the ability to run interactive spark on k8s, with auto-scale and on-demand provisioning of resources. we ended up using Livy & SparkMagic.

Apache Livy - is a rest API session manager over Apache Spark, the main idea was that in idle time we will have minimum existing pods in the cluster so resources could be free and downscale will take effect- Livy on it’s main repo still doesn’t support Spark K8s, but we found a repo (https://github.com/jahstreet) that have all we needed for our architecture. using this repo we got the behaviour we wanted — spark spins up pods on session initialize and destroy them when session ends.

SparkMagic — is the glow between Jupyter and Livy, it provides a kernel in PySpark, Scala and R with them we can run Spark interactive jobs via Livy service, this “Magic Show” gives the Jupyter instance a distributed processing powers that can handle endless data size just by connecting the components together.

To make it more clear let’s see the following diagram that shows the components taking part on our system:

Jupyter & Spark Architecture — Connecting the dots

So you can now spot the instances that takes part in our echo-system .

When it’s cool-down period the only pods being up are — Hub Pod, Proxy Pod and Livy service, those of corse can have minimum resources and giving that we can assign them to “low-resources” node pool in Kubernetes.

In warm-up period we have xN Jupyter instances (1 per user) that being allocated using the profile list for each user:

profileList:
- display_name: "Default Small Image"
kubespawner_override:
cpu_limit: .5
cpu_guarantee: .5
mem_limit: "2G"
mem_guarantee: "512M"
node_selector:
cloud.google.com/gke-nodepool: small-users-pool
- display_name: "Strong Compute Image"
kubespawner_override:
cpu_limit: 8
cpu_guarantee: 8
mem_limit: "16G"
mem_guarantee: "512M"
node_selector:
cloud.google.com/gke-nodepool: users-strong-pool-8c-16
- display_name: "Super Strong Image"
kubespawner_override:
cpu_limit: 8
cpu_guarantee: 8
mem_limit: "32G"
mem_guarantee: "512M"
node_selector:
cloud.google.com/gke-nodepool: users-strong-pool-8c-32g

When user choose one of the profiles he need’s JupyterHub spins a pod with the node_selector key, that triggers and auto-scale by the K8S system to allocate new nodes (if needed).

So-far-So good but what happens when users starts using spark code? Well that’s where the magic comes in place our new Livy service spins up a driver pod per user and start executing the needed statement the user requested.

You probably notice I didn’t say nothing about the the executers — that’s where Spark 3.0.0 was a game changer for us. Apache Spark has a mechanism called dynamicAllocation when using it Spark based on workload spins up more and more executers to support the scale, spark can estimate during processing the number of executers needed to support the next computation.

From Spark Documentation:

Spark provides a mechanism to dynamically adjust the resources your application occupies based on the workload. This means that your application may give resources back to the cluster if they are no longer used and request them again later when there is demand. This feature is particularly useful if multiple applications share resources in your Spark cluster.
https://spark.apache.org/docs/latest/job-scheduling.html

Before Spark 3.0.0 this feature of Spark could be enable only if externalShuffleService is enabled — it’s needed to control the shuffle files when Spark decide to destroy executer once workload is done. The main problem was that the externalShuffleService wasn’t supported in kubernetes before Spark 3.0.0 - but today using the shuffleTracking flag you can control this feature on you’re own, so we ended up using the following spark config:

"spark.dynamicAllocation.executorIdleTimeout":"60s",
"spark.dynamicAllocation.executorAllocationRatio": "0.2",
"spark.dynamicAllocation.schedulerBacklogTimeout":"1s",
"spark.dynamicAllocation.shuffleTracking.timeout": "10s" ,
"spark.dynamicAllocation.shuffleTracking.enabled": "true",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": "1000"

This config makes sure every user will get 2 executers at the beginning of his work and Spark will spin up more and more executers as needed based on the statement workload. Using this feature we can dynamically support workload’s and give the user’s performant environment.

Another thing needed to be setup in this architecture is the ability to load JAR’s into Spark class-path dynamically without the efforts of building new image on every JAR needed. to do so we used a tool called gcsfuse. gcsfuse give’s us the ability to mount a google bucket into a pod and use it as a file-system to load files into spark runtime, to install gcsfuse we added the following command in our custom Spark image:

RUN  apt-get -y -o Acquire::Check-Valid-Until=false update && apt-get install --yes --no-install-recommends \ 
ca-certificates \
curl \
gnupg \
&& echo "deb http://packages.cloud.google.com/apt $GCSFUSE_REPO main" \
| tee /etc/apt/sources.list.d/gcsfuse.list \
&& curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - \
&& apt-get -y -o Acquire::Check-Valid-Until=false update \
&& apt-get install --yes gcsfuse \
&& apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

After installing gcsfuse you can now simply run the mount command on the preStart hook and you will mount the bucket into the pod:

gcsfuse -o nonempty --key-file /path/to/service/account/secret.json [bucket name] /etc/google/

and configure spark to add jar’s to it’s class-path:

"spark.driver.extraClassPath": "/etc/google/jars/*",
"spark.executor.extraClassPath": "/etc/google/jars/*"

Conclusions and what’s next:

After making this progress, we ended up having a platform chipper (compared to the original VM’s 24/7 solutions), reliable every component is isolated — user pods issues can only effect his own pod, without having to deal with shared-resources environment where everyone effects everyone, almost Zero Maintenance to system component — each time we need to upgrade the system we use pre-production image tags to lunch and test the new images which are immutable objects, nothing can effect the packages unless it’s an external issues we couldn’t control on our own.

Next things needs to be handled:

DaemonSet — can help you receive faster loading of images

Follow us and in the next articles I will share the scheduling solutions using Composer and how we developed a full CI&CD flow for Jupyter notebooks, form commit to production.

Thanks for reading,
Ben Mizrahi

Bugs always exists between the keyboard to the screen :)

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store