Hubble: Orchestrating Spark SQL & AWS Batch for Experimentation Analysis

Summary In order to build segments, run experimentation analysis and execute metrics queries for Galileo, our new experimentation platform, we built Hubble – a generic and reliable task queue. Hubble allows us to submit and monitor SQL or Python jobs. This service allowed us to abstract away from the job type with minimal overhead and enabled us to automate analysis of all experiments in Careem.
Team Data & AI
Author(s) Mohamed Abdo, Roman Atachiants, Abhinav Johri
About the Author(s) Mohamed, Roman, and Abhinav are software engineers in Careem, working in the Data & AI team and helping to build a state-of-the-art experimentation, feature toggles platform as well as a machine learning platform.

Introduction

In the process of building our internal experimentation platform, we found ourselves in need of a general-purpose component to run SQL queries and Python jobs. These were needed for several use cases such as building segments for rollouts and experiments, running validation checks to test user-defined metrics, and getting metric data to calculate statistics for experiments analysis reports, as shown in the image below.

Since our team’s mission is to build a reliable and accessible experimentation platform that can be used by any team across the company, we needed to cover any valid scenario where users would want to query a particular data source with a specific query engine. Hence, we needed to add a layer of indirection in order to abstract our platform away from a specific query engine such as Presto or Spark.

We decided to tackle this problem by building a new Go microservice: a query runner that can receive, queue, and execute various jobs. We named this service Hubble, after Edwin Hubble, the American astronomer – a fitting name since our experimentation platform is called Galileo and is designed to enable trustworthy, data-driven decision making to the organization. This article describes the approach we took, the challenges we faced, and the key milestones we went through.

Overview of Hubble Service

In this section, we cover the main components of Hubble in more detail starting with its high-level component diagram.

The above diagram represents the final components design for our service. However, while the service design did not change much since the original internal RFC, we experienced some challenges while implementing the service. We will cover those challenges separately in the War Stories section.

In order to use Hubble, we opted to keep it simple with an HTTP API (OpenAPI style). In our implementation, we used Gin library as a web server component and built two main endpoints:

    • /submit: This endpoint is used for submitting a query to the service. It validates and enqueues the job and returns a unique identifier. It expects the following input:
      • Query: The required query string to run, this could be either a SQL query itself or a name of a Python job to execute.
      • Data Format: The output data format specifying how to encode the result set. Currently, we only support CSV and Parquet formats, since our system is dealing with relatively large datasets.
      • Driver: The type of the runner to use in order to execute the job, such as AWS Batch, Spark, Presto, etc.
      • Source: The Identifier for the source of the task, could be a service-name or user-id.
      • Source ID, Kind and Version: These are optional task attributes that can be set by the request initiator in order to link with the source.
    • /status: The endpoint expects a unique job ID that was generated during the submit operation. The service looks up the job status of the passed job ID and returns it in the response. If the task execution is finished, then a download link (pre-signed S3 URL) will be attached to the response in case of success. If the task fails then the error message will be returned to the user.

Publish / Subscribe

All micro-services within our dynamic configuration & experimentation platform (Galileo) communicate through a common message bus, employing a publish/subscribe pattern as shown in the diagram below. The message bus itself is built using Kafka and we are using cloudevents as the envelope for all of our messages on the bus.

In addition to the API, Hubble can also receive tasks and push status events through this message bus. Other event driven services can subscribe to such events and identify the corresponding task using the source ID, source kind, source version fields mentioned above. Once the job is received by Hubble, it creates a job object and inserts it into the task queue, and the same goes for the API.

Task Queues

There could be potentially hundreds of concurrent jobs that are coming into Hubble via its API or Kafka. In order to not overwhelm our downstream task runners such as our Spark cluster, we need to queue the jobs. 

For the implementation, we decided to use Redis as a storage layer for task queues, because of its simplicity and high throughput. It is also readily available via a managed AWS ElastiCache service which spares us the burden of managing it.

Hubble jobs are distributed across different task queues and each job is assigned to a specific queue according to its driver. For example, we have one queue for AWS Batch jobs and another queue for Spark SQL jobs. Such isolation is better because it can prevent potential deadlocks among tasks of different types. We discuss this issue further in the War Stories section.

Task Runners

In order to actually run the jobs, Hubble abstracts these as “task runners”, each defining how a specific job needs to be executed. This part is designed to be as generic as possible so that we can extend Hubble in the future with additional job types. In fact, we have started with the support of Spark SQL and later implemented AWS Batch as a task runner. In the future, we are planning to implement more runners such as Presto and Druid.

In the true spirit of Go, creating a new runner implementation for our engineers is as simple as implementing the interface. We intended to have our runners in the form of remote calls to the concrete executors for the following reasons:

  • This allowed us to decouple the platform, reuse our internal infrastructure and simplify any future migrations.
  • This also allowed us to scale each runner separately, and have a clear responsibility of Hubble to queue jobs.

Spark Runner

The main role of the Spark runner is to execute row Spark SQL queries submitted by users. For our experimentation platform, we use it to build user segments (shown below), query metrics, and analyze experiments.

Given that our team also handles the in-house machine learning platform (future articles to be posted about our ML platform), we already had a setup that allows us to generate a dataset by running Spark SQL queries on AWS EMR and pushing the result to S3. Thus, with a small tweak in the current flow, we can reuse it and save on cost. We did that by creating a new Scala class that would be responsible for picking up the query from Hubble Spark runner and mapping it to our existing setup.

On the EMR cluster side, we added a new queue for Hubble with higher priority than normal dataset generation jobs. This was important to make sure that the queueing time of our jobs is minimal and the progress is not blocked by heavy tasks running at the same time.

On executing a spark job, the runner picks up the query from the job object and creates job configs needed by the remote executor that will execute the Scala code. Then the runner submits the Spark batch job request to the EMR using Apache Livy and keeps polling the job status on a continuous interval until it finishes.

If the job is successful, Spark runner will return the result S3 path to Hubble. Then Hubble will perform its success callback flow:

  1. Mark task state as succeeded and remove it from the task queue.
  2. Presign the S3 path returned by the task.
  3. Push the status event with the pre-signed download URL to Kafka stream.

On the other hand, when the job fails the runner will raise an exception. Hubble will then handle this exception by marking the task as failed and pushing the status event through Kafka.

Python Runner

Analyzing experiments is at the core of our work in Galileo, hence we needed a reliable and automated way to create reports for all of our experiments. This allows our colleagues to automatically analyze experiments and measure various KPIs that are important to them. If you consider one of such “report cards” for a KPI, as shown on the image below, we need to not only execute several Spark SQL queries but also run a deep analysis and compute statistical significance.

In order to accommodate this, we decided to leverage AWS Batch to run our general-purpose Python program that does all of this work. When executing a Python job, Hubble extracts the experiment name from the job object and passes it over as an environment variable, together with the result path, to the Docker container running our Python job on AWS Batch. 

As you might have noticed, our current logic is still coupled to our current use case, but our next step is to generalize this specific runner in order to be able to execute any Python job we might need in the future.

Monitoring

During the development journey of Hubble, we went through some issues that we will discuss in the next section. Such issues brought the importance of having more visibility on everything that happens inside Hubble, such as the number of tasks being served at a given point of time, and the elapsed time for each task.

We integrated our service with Prometheus infra provided by our infrastructure team in Careem and were able to visualize these metrics through Grafana dashboards. Shown above is an example metric for the number of pending jobs in a given time. You can see here that spikes in tasks are happening when we run experiment analysis for all our running experiments on a daily basis at a specific time slot

War Stories

In the previous sections, we mentioned on a couple of occasions that we had experienced some challenges while developing Hubble. In this section, we will discuss the most annoying problems we faced and how we managed to tackle them.

Writing Hubble in Python

Prior to implementing Hubble in Go, we had decided to write it in Python. This was because most of our machine learning packages were Python-based and we had pretty good knowledge of the libraries we were going to use.

Given that the service is running on Kubernetes, we wanted to have everything running in a single process. This is tricky in CPython due to the GIL (Global Interpreter Lock) that allows only one thread to hold the control of the Python interpreter, which means that we cannot have parallel (background) execution in our code.

As you can see from the design, most of our operations are I/O bound and don’t contain much computation. Thus, concurrency (which is supported by Python through the AsyncIO framework) can be a good option here. Indeed we carried on with this approach and had promising prototype results with the following stack:

  • Uvicorn as a web server, with Falcon ASGI (Asynchronous Server Gateway Interface) as a web application.
  • ARQ as an Asynchronous task queue on the top of Redis.
  • AIOKafka for Publisher/Subscriber.

After some time running Hubble, we noticed that some jobs take a long time to finish and a lot of timeouts happened because jobs exceeded the expected execution time. Moreover, the issue started increasing when scheduling multiple jobs to run every few minutes. After some troubleshooting and debugging we found that a couple of issues were happening:

  • Kafka producer was losing the connection with Kafka stream at some points, which caused the status events to be dropped. From the event driven systems’ point of view,  tasks whose events were dropped will remain running for good. The issue persisted even after changing the Kafka producer library.
  • ARQ hung on queueing many tasks. While examining our Redis records via the Redis CLI, we could see some tasks stuck in a scheduled state, even when the workers across Hubble pods were idle. The blocked jobs got processed on service redeployment, which implies that something was wrong with ARQ-Redis communication on having multiple Hubble instances.

We did not manage to come up with an approach that can guarantee a long-term solution for the above-mentioned issues, without requiring a huge time investment. Given the time constraints, we decided to abandon Python and rewrite the service in Go. Another reason we were encouraged to do this is that by the time this suggestion was made, the entire team was already experienced with Go. Moreover, other experimentation platform components are written in Go, which standardized our codebase stack.

Task Queue Deadlock

Prior to having a dedicated queue for each runner, we used to have a single queue for all job types and an upper bound for workers count per instance. As part of the Python job which executes experiment analysis, Galileo Python SDK fetches the experiment metrics using Spark jobs. As a result, Python jobs will submit one or more Spark jobs and will be blocked until these jobs are complete. 

Since the number of parallel jobs that can run at a given time is limited by the number of workers, we ran into this issue when the number of running Python jobs exceeded the number of configured workers. Python jobs will submit Spark jobs to Hubble and will block them until these jobs are finished, but Spark jobs will never start since all workers are busy executing the parent Python jobs. Hence, we were having a distributed deadlock.

The solution for this issue was to have a task queue for each job type, which will isolate the workers per task queue and will allow achieving progress in all job types at the same time. This also allowed us to customize the configuration of each task queue in the future without impacting the flow of other job types.

Passing Query Error

Most of our Spark SQL jobs fail due to query syntax error, which brought the importance of displaying the error message to the user on our UI, as shown below. We needed to communicate the exception that happened in the Scala code to our job running in Hubble. Unfortunately, the error message in Livy response mentioned that a Spark session has failed but did not include the root cause for the failure.

To tackle this issue, we added an updated endpoint to Hubble which receives a message body and job ID then appends the error message to the corresponding job object in Redis. On the Scala side, when an exception occurs the following flow will take place:

  1. If an exception happens, it is caught by the job itself.
  2. It posts the error message to Hubble through its newly updated endpoint.
  3. Finally, it rethrows the exception so that the job can fail.

Conclusions

In this article, we discussed how we built our query runner and how it can be used to get the data from different data sources with different engines. We also discussed how Hubble orchestrates the concrete executors to run jobs at scale and how it can be simply extended to adopt other use cases in the future.

While the design of Hubble was relatively simple, implementation was not necessarily trivial as we went through some challenges and even re-wrote the entire service in Go at the end. Most of our challenges were on the integration between different systems and proper error handling, which allowed our end users to reliably build segments, write metrics and analyze experiments effortlessly.