The Celery code update problem
Background
In Singular, we use Celery extensively as the main task-running framework for our various data pipelines. Millions of tasks run daily on a Kubernetes cluster with thousands of pods.
We use Celery for various purposes — scraping, monitoring, data enrichment, database loading, and more. We are also integrated with hundreds of 3rd party APIs as a part of our product. One important implication is that our tasks vary greatly in duration — some tasks can take seconds, but others can take hours to complete.
This creates a major issue around code updates. Our R&D moves fast and makes dozens of deploys to the data pipeline daily — while the system is live and running long-duration tasks.
In this blog post, we will survey the different approaches we took in Singular for code updates as our usage of Celery involved and our scale grew.
Celery workers architecture
Before diving into the different approaches, let's say a few words about how Celery works.
A Celery setup typically involves the following components:
- One or more celery workers. Each worker can run multiple Celery tasks simultaneously (controlled by a concurrency parameter).
- A broker which contains one or more queues that workers listen on (Celery supports several broker platforms — see below).
- An optional backend component to store task results (Celery supports several backend platforms — see below).
- A task definition file with functions that are exposed as tasks. These tasks can be sent to and consumed from queues.
An illustration of the architecture can be seen here:
A tasks definition file (usually called “tasks.py”) contains an app definition and several functions wrapped with the @app.task decorator, which exposes them as tasks.
For example:
This file is loaded as soon as the celery worker starts.
The final point to note is how Celery handles concurrency. Celery has multiple modes for managing concurrency, with the default and most common being prefork pool.
In this mode, the main process forks subprocesses which consume tasks from the worker-configured queues.
Here it gets interesting regarding code updates — the tasks module and every module it imports get loaded in the main process. Since the main process is forked — every new subprocess will contain the version of the code at the time the worker started. Therefore, we can’t update the task code without restarting the worker.
In the next sections, we’ll review the different approaches we took to handle this problem.
The naive approach — restart Celery on deploys
The simplest approach to Celery code updates is to restart the Celery worker on every deployment. This is a good enough approach when the scale is small and tasks are not too long.
There are three primary ways to stop/restart a Celery worker:
Warm shutdown
In a warm shutdown, we do a graceful termination — the worker will wait for all currently running tasks to finish before terminating. The flow, in this case, looks like this:
- Send a TERM signal to the worker: “kill -TERM <celery worker pid>.”
- Wait for the worker to terminate.
This can potentially hang for a long time — depending on the length of tasks + the configured task timeout in the worker. - Update code.
- Start the workers again.
Cold shutdown
In a cold shutdown flow, we force any currently running task to terminate immediately. The flow looks like this:
- Send a QUIT signal to the worker: “kill -QUIT <celery worker pid>.” This command will attempt to send an exception inside each currently running task to terminate them cleanly.
- Update code.
- Start the workers again.
Kill brutally
Cold shutdown may fail if tasks are stuck on C code (for example, system calls). In this case, you may have no alternative but to kill the worker brutally:
- Kill the main process and every subprocess with “kill -9”.
- Update code.
- Start the workers again.
A typical approach is to start with graceful termination (warm shutdown) and progress to the other methods if the worker does not terminate after a specified timeout.
However, if we have tasks that take hours, we’ll be forced to kill them to restart the worker.
As Singular grew, this deployment method became unscalable. We needed a way to update code without interrupting existing tasks.
In the next section, we’ll see our first approach to achieving this.
Git updates + lazy imports
In Singular’s early days, before our data pipeline environment was dockerized, our way to update code on our servers consisted of simple git updates.
While not ideal, it did give us a benefit that is harder to achieve with a fully dockerized setup — lazy imports!
As discussed earlier, Celery’s primary concurrency mechanism is a prefork model. In this model, subprocesses are forked from Celery’s main process and are used to run tasks. Therefore, if a module is not imported in the main process, it will be imported only when the subprocess starts.
We, therefore, utilized the following strategy:
- We set the setting max_tasks_per_child=1 in celeryconfig.py. This means that every subprocess is used to run only one task, and when it finishes, it terminates, and a new subprocess is spawned instead of it.
This is beneficial because any code we import lazily in the task will be re-imported in future tasks (and therefore contain updated code). - Include the least possible amount of code and imports in the tasks.py module.
We did this by creating another tasks_inner.py module which contains the actual task implementation and is imported lazily when the task starts to run. - In the example below (fig 3 — tasks_inner pattern), the tasks.py module is imported by the main process, but it doesn’t contain anything “interesting” and doesn’t import anything.
The “task_inner.py” module, on the other hand, is only loaded when the task runs, and therefore its code and anything it imports will be lazily imported in each task with the most updated code.
This approach is handy because it’s very simple to implement and effectively allows you to update code without restarting Celery workers. This means that a Celery worker can have at the same time 20 different running tasks, each with a different version of the code!
It does have significant drawbacks though:
- Updating code with git means that different modules may originate from different versions of the code, which can lead to “deploy time races” (For example, module A relies on a constant in module B, but only A was lazy-loaded).
- 3rd party library updates are a pain — particularly because it’s hard to guarantee exactly when they are imported (and Celery itself may even use some).
- Time bombs — since in this approach, workers aren’t restarted frequently, and some of the code is only updated when they restart, bad deploys may surface issues only days or weeks later!
These issues were negligible initially, and the strategy above worked well for us for several years. At some point, though, almost every deployment caused many tasks to break and major incidents in some cases, and we realized we needed to change our architecture entirely.
Dockerization, Kubernetes, and Graceful Stop
The move to Kubernetes
It was clear to us that we couldn’t keep updating code with Git forever. Following up on more modern trends and technologies, we chose to dockerize our Celery environment. Every time we deploy code, our CI builds a docker that installs all the relevant requirements and contains the most recent version of the code.
Dockerizing our environment guarantees a consistent version across our code and dependencies.
We also employed Kubernetes for orchestration — each worker runs in a kubernetes pod (docker), and we run many different pods across various nodes (servers) managed by our Kubernetes cluster.
Pods are deployed and managed in our system via statefulsets — which essentially represent a set of pods initialized from the same container spec — containing the specific docker image to use and various settings. Each statefulset refers to some kind of workload in our system (for example, monitoring task workers), and has a configured number of “replicas” (pods) it runs in the system.
Statefulsets handle code updates nicely — they support two types of update strategies:
- RollingUpdate (default) — Gracefully terminates (sends a TERM signal to) all the pods in the statefulsets, and starts a new pod (with the updated docker image) as soon as each pod terminates.
The updates are done in sequential order (in reverse order to creation), so each pod is only updated after the previous one is completed. - OnDelete — pods are not automatically updated in this mode. When pods are deleted (need to be explicitly invoked per pod), new pods will be created once they terminate.
Because we have long-running tasks, we avoided waiting for each pod to finish on deploys. So we did the following:
- We defined an OnDelete strategy.
- After updating our statefulsets, we delete all the pods. This results in a TERM signal being sent to the pods, which we forward to Celery (causing a warm shutdown). After a timeout of 24 hours, we brutally kill the pod.
- We don’t wait for the pods to terminate (“ — wait=false”). This means we can terminate all pods in parallel + the deployment doesn’t block their termination (so it’s fine if they take hours to terminate until tasks finish).
This solves the code update problem neatly and avoids the races mentioned in the previous section. Pods within the same statefulset can run various code versions, and each can terminate independently. On the other hand, we are guaranteed that the total number of running pods in the statefulset will remain the same at all times (controlled by the number of replicas).
We’re almost done, but not quite. We encountered another problem — it turns out that Celery’s graceful termination is not as graceful as we thought.
Celery graceful stop
The solution to sending a TERM signal to Celery workers for graceful termination overall works well. However, in the time between the TERM signal and the final termination, the worker turns into a sort of “zombie” — while it does allow the currently running task to finish, it shuts down a few important services:
- Heartbeats — celery workers stop sending heartbeat events once they terminate (which are important for monitoring purposes).
- Inspect commands — these are commands that can be sent to the worker to get useful information (such as which tasks are currently running).
- Control commands — allow to change certain settings in a worker (such as add/remove queues).
Since our pods can spend hours in a terminating state, we wanted to have these services fully functioning. Fortunately, we found a nice open-source library called celery_graceful_stop which replaces the TERM signal implementation with a more friendly one that keeps these services up and running.
Unfortunately, this library was last updated in 2016, and a lot has happened in Celery since (in particular, it broke in Celery 4). After some research, we decided to implement our version — more on that in a future post!
Conclusion
In this post, we surveyed our different approaches to code deployments as our usage of Celery evolved over the years — from restarting workers on every deployment to our final approach of using Kubernetes statefulsets.
There is a tradeoff in these approaches between simplicity and robustness:
- Restarting workers is the simplest approach we’d probably take in a small/initial Celery deployment.
- Kubernetes statefulsets is the most robust one — allowing multiple code versions to co-exist without the drawback of deploy-time races and time-bombs.
- Git updates + lazy imports offer some middle ground by being simpler than statefulsets but prone to issues at scale.
There isn’t a single correct solution — we hope the information here will help other companies choose the best strategy for them!