Spark & Jupyter on K8S — Tech Story

Ben Mizrahi
Plarium-engineering

--

Introduction:

If you’ve been in the data engineering industry for a while — you probably got to know all the existing notebooks solutions out there. In this article I’m going to focus on what and how we use it in our Data & M.L platform.

First let talk about us…

Plarium is a leading gaming company in the industry — Plarium’s games aren’t just play-for-fun it’s totally a way of life — with millions of active players every day, our games have already ranked Plarium 1# the in RPG & Strategy game charts. Raid: Shadow Legends is just one example of a game that blows up the charts every day.

All our games relay on data. In the gaming industry, it’s not just a “nice-to-have” solution in the tech stack. With more and more companies competing for limited players — our gameplay experience must be the best we can deliver and the way to know that relays on our data. All of that data is being collected, transformed, analyzed and exposed in our Data & M.L platform — that’s what drives our business forward and provides the best gameplay experience for our players.

So with that context — let’s start:

The first thing we did is decide what our main tech stack solution is. For this we need to focus on the goal and the requirements of our Data & M.L platform interactive system:

  1. Always Available — The main goal of a data platform in a fast-growing company is to give us the ability to take actions based on numbers with minimum time and effort. To achieve this we need to have flexible environment that everyone can use - anytime, anywhere.
  2. Multiple Clients — As the data grows the need to investigate and learn from it also grows. With this fact in mind - our platform should support every data costumer (Engineer, Analysts, Scientists, Visualization, BI, etc.).
  3. Scale To Zero — Another thing is the concept of paying only on what we need. Defining the needs and estimation of payment is highly critical — when you need to process TB’s of data — take resources as you need.
    When you no longer need the data — destroy.
  4. Minimum Maintenance Efforts — As a concept we don’t want to have an entire DevOps department to support our system stability and health. We, as engineers, want to take it on our own — with minimal Friday night wake-up calls.
  5. Isolated User Environment — One of the lessons we learned during investigation is that a shared user’s environment sucks. Every change, load or installation in the environment affects all users and with that — it can break the user’s work. We prefer the handle stuff individually and make the user environment isolated as possible.

We have lots of other requirements and needs for sure, but basically that’s the 5 general ideas we keep in mind during our journey. The next thing we need to take into consideration is our tech stack :

Making it reliable — From VM’s to Containers:

After a long time working with VM’s on the cloud we suffer a lot from updates, packages deprecation and mismatch with packages needed to be installed — this may happen either if you work with Scala and share the JVM class-path or work with python and discover that the latest package update has some breaking changes (and of course this happens at 8pm on a Friday) — So we wanted to avoid this and make our solution based on immutable containers. It’s a tech solution that proved itself, and it just works.

Making it Scale — Containers & K8s:

Since we were already working with GCP for a long time, basically we had no problem of scaling containers at the cloud — we used Google Kubernetes Engine to create/destroy and update our containers solution.

Making it Schedule — Cloud Composer (Airflow):

We truly believe in solutions as a service. Google Cloud, as a platform, has tons of services that can really make your life easier, especially when we are talking about R&D teams that need to be focused on business and not in discovering why the hell the process didn’t run today. We chose GCP managed Airflow solution (AKA: Cloud Composer). Composer has really made our lives easier after a long time spent on other scheduler systems. We found it as the most popular, simple (infra as a code) and stable solution. Moreover, when choosing to go with Composer we got some other stuff for “free”: containerized Airflow, Authentication , GCP plugins and many more.

Making it a distributed — Apache Spark :

We used to use Apache Spark for a while — All of our pipelines are built using Spark and it plays a big part in our ability to process huge amounts of data in both streaming and batch. So as a starting point we wanted to bring this technology to our interactive solution. The last release (v3.0.0) of spark was a game changer for us and we will discuss this topic later.

JupyterHub — Connecting the Dots

So after we clarified what tech stack we want to use - we start investigating solutions out there in the open-source forest. The first thing we spot out there is JupyterHub — a solution provided by the Jupyter community for multi-user environment — the best part of it is the awesome documentation of how to get things done in the cloud with our stack at GCP Kubernetes: https://zero-to-jupyterhub.readthedocs.io/en/latest/. So as a starting point - we created the cluster and managed to launch the JupyterHub part, but that’s where things come to a point that we need to go deeper into the code.

To support Apache Spark in Jupyter we get into the forest again, and start investigating solutions that will give us the ability to run interactive spark on k8s, with auto-scale and on-demand provisioning of resources. We ended up using Livy & SparkMagic.

Apache Livy - is a rest API session manager over Apache Spark, the main idea was that in idle time we will have minimum existing pods in the cluster so resources could be free and downscale will take effect. Livy on its main repo still doesn’t support Spark K8s, but we found a repo (https://github.com/jahstreet) that have all we needed for our architecture. Using this repo, we got the behavior we wanted — spark spins up pods on session, initializes and destroys them when session ends.

SparkMagic — This is the glow between Jupyter and Livy. It provides a kernel in PySpark, Scala and R with which we can run Spark interactive jobs via Livy service. This “Magic Show” gives the Jupyter instance distributed processing powers, that can handle endless data sizes just by connecting the components together.

To make it more clear let’s see the following diagram that shows the components taking part on our system:

Jupyter & Spark Architecture — Connecting the dots

So you can now spot the instances that take part in our echo-system .

When it’s cool-down period the only pods being up are Hub Pod, Proxy Pod and Livy service. Those of course can have minimum resources given that we can assign them to “low-resources” node pool in Kubernetes.

In warm-up period we have xN Jupyter instances (1 per user) that are being allocated using the profile list for each user:

profileList:
- display_name: "Default Small Image"
kubespawner_override:
cpu_limit: .5
cpu_guarantee: .5
mem_limit: "2G"
mem_guarantee: "512M"
node_selector:
cloud.google.com/gke-nodepool: small-users-pool
- display_name: "Strong Compute Image"
kubespawner_override:
cpu_limit: 8
cpu_guarantee: 8
mem_limit: "16G"
mem_guarantee: "512M"
node_selector:
cloud.google.com/gke-nodepool: users-strong-pool-8c-16
- display_name: "Super Strong Image"
kubespawner_override:
cpu_limit: 8
cpu_guarantee: 8
mem_limit: "32G"
mem_guarantee: "512M"
node_selector:
cloud.google.com/gke-nodepool: users-strong-pool-8c-32g

When a user chooses one of the profiles he needs - JupyterHub spins a pod with the node_selector key, which triggers and auto-scales by the K8S system to allocate new nodes (if needed).

So far so good - but what happens when users start using spark code? Well, that’s where the magic comes into place: our new Livy service spins up a driver pod per user and starts executing the needed statement the user requested.

You probably notice I didn’t say anything about the executers — that’s where Spark 3.0.0 was a game changer for us. Apache Spark has a mechanism called dynamicAllocation. When using it Spark based on workload spins up more and more executers to support the scale. During processing, Spark can estimate the number of executers needed to support the next computation.

From Spark Documentation:

Spark provides a mechanism to dynamically adjust the resources your application occupies based on the workload. This means that your application may give resources back to the cluster if they are no longer used and request them again later when there is demand. This feature is particularly useful if multiple applications share resources in your Spark cluster.
https://spark.apache.org/docs/latest/job-scheduling.html

Before Spark 3.0.0. this feature of Spark could be enabled only if externalShuffleService is enabled — it’s needed to control the shuffle files, when Spark decides to destroy executer once workload is done. The main problem was that the externalShuffleService wasn’t supported in kubernetes before Spark 3.0.0 - but today using the shuffleTracking flag you can control this feature on your own, so we ended up using the following Spark config:

"spark.dynamicAllocation.executorIdleTimeout":"60s",
"spark.dynamicAllocation.executorAllocationRatio": "0.2",
"spark.dynamicAllocation.schedulerBacklogTimeout":"1s",
"spark.dynamicAllocation.shuffleTracking.timeout": "10s" ,
"spark.dynamicAllocation.shuffleTracking.enabled": "true",
"spark.dynamicAllocation.minExecutors": "2",
"spark.dynamicAllocation.maxExecutors": "1000"

This config makes sure every user will get 2 executers at the beginning of his work and Spark will spin up more and more executers as needed based on the statement workload. Using this feature we can dynamically support workloads and give the users a better performant environment.

Another thing that needs to be setup in this architecture is the ability to load JAR’s into Spark class-path dynamically, without the efforts of building a new image on every JAR needed. To do so we used a tool called gcsfuse. gcsfuse give’s us the ability to mount a google bucket into a pod and use it as a file-system to load files into Spark runtime. To install gcsfuse we added the following command in our custom Spark image:

RUN  apt-get -y -o Acquire::Check-Valid-Until=false update && apt-get install --yes --no-install-recommends \ 
ca-certificates \
curl \
gnupg \
&& echo "deb http://packages.cloud.google.com/apt $GCSFUSE_REPO main" \
| tee /etc/apt/sources.list.d/gcsfuse.list \
&& curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - \
&& apt-get -y -o Acquire::Check-Valid-Until=false update \
&& apt-get install --yes gcsfuse \
&& apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

After installing gcsfuse you can now simply run the mount command on the preStart hook and you will mount the bucket into the pod:

gcsfuse -o nonempty --key-file /path/to/service/account/secret.json [bucket name] /etc/google/

and configure Spark to add jars to its class-path:

"spark.driver.extraClassPath": "/etc/google/jars/*",
"spark.executor.extraClassPath": "/etc/google/jars/*"

Conclusions and what’s next:

After making this progress, we ended up having a cheaper platform (compared to the original VM’s 24/7 solutions); a reliable platform in which every component is isolated — user pods issues can only affect their own pod, without having to deal with shared-resources environment where everyone affects everyone; and an almost Zero Maintenance to system component system — each time we need to upgrade the system we use pre-production image tags to launch and test the new images, which are immutable objects. Nothing can affect the packages unless it’s external issues we couldn’t control on our own.

Next things that need to be handled:

  1. Spark Image is huge — even when using all internally inside our GCP network, Pulling spark image still takes time. This issue maybe doesn’t break anything - but it cause our users to wait some time (2–3 min) when launching Spark job. Today we handle this issue with having a job that triggers a minimum number of nodes at 7am — with that we also setup a PrePull DaemonSet that pulls Spark image on every K8S node before launching any job. By using this technique, we decrease the spin up from minutes to seconds :
DaemonSet — can help you receive faster loading of images
  1. Spark Shuffle Tracking is an experimental feature — we took a risk here. Spark documentation marks this flag as an experimental feature so it might have problems in the future. But because we know that supporting dynamic Allocation in K8S exists in the Spark roadmap — we can adapt our solution in the next release. So far, while running in production, we haven’t seen major effect over stability or performance.
  2. SparkMagic — it’s good but not enough — Spark kernel still have lots to do in terms of UX and stability. Sometimes we run into problems like inability to cancel execution of a cell or a kernel status is Unknown.

Follow us to check out the next articles, where I will share the scheduling solutions using Composer and how we developed a full CI&CD flow for Jupyter notebooks, from commit to production.

Thanks for reading,
Ben Mizrahi

--

--