Prefect: Production Workflows

Who am I?

I am Chris White; I am the Tech Leadat Prefect, a company building the next generation of workflow automation platforms for data engineers and datascientists. In this role, I am the core developer of our open source enginewhich allows users to build, schedule and execute robust workflows.

The Problem I’m trying to solve

Most teams are responsible for maintaining production workflows thatare critical to the team’s mission. Historically these workflows consistedlargely of batch ETL jobs, but more recently include things such asdeploying parametrized machine learning models, ad-hoc reporting, andhandling event-driven processes.

Typically this means developers need a workflow system which can do things such as:

  • retry failed tasks
  • schedule jobs to run automatically
  • log detailed progress (and history) of the workflow
  • provide a dashboard / UI for inspecting system health
  • provide notification hooks for when things go wrong

among many other things. We at Prefect like to think of a workflow system asa technical insurance policy - you shouldn’t really notice it much whenthings are going well, but it should be maximally useful when things go wrong.

Prefect’s goal is to build the next generation workflow system. Older systemssuch as Airflow and Luigi are limitedby their model of workflows as slow-moving, regularly scheduled,with limited inter-task communication. Prefect, on the other hand, embracesthis new reality and makes very few assumptions about the nature and requirements ofworkflows, thereby supporting more dynamic use cases in both data engineeringand data science.

How Dask helps

Prefect was designed and built with Dask in mind. Historically, workflow systemssuch as Airflow handled all scheduling, of bothworkflows and the individual tasks contained within the workflows. This pattern introduces a number of problems:

  • this puts an enormous burden on the central scheduler (it is scheduling every single action taken in the system)
  • it adds non-trivial latency to task runs
  • in practice, this limits the amount of dynamicism workflows can have
  • it also tends to limit the amount of data tasks can share, as all information is routed through the central scheduler
  • it requires users to have an external scheduler service running to run their workflows at all!

Instead, Prefect handles the scheduling of workflows, and lets Daskhandle the scheduling and resource management of tasks within each workflow. Thisprovides a number of benefits out of the box:

  • Task scheduling: Dask handles all task scheduling within a workflow, allowing Prefect to incentivize smaller tasks which Dask schedules with millisecond latency
  • “Dataflow”: because Dask handles serializing and communicating the appropriate information between Tasks, Prefect can support “dataflow” as a first-class pattern
  • Distributed computation: Dask handles allocating Tasks to workers in a cluster, allowing users to immediately realize the benefits of distributed computation with minimal overhead
  • Parallelism: whether running in a cluster or locally, Dask provides parallel Task execution off the shelf

Additionally, because Dask is written in pure Python and has an active open source community,we can very easily get feedback on possible bugs, and even contribute to improving the software ourselves.

To achieve this ability to run workflows with many tasks, we found that Dask’s Futures interfaceserves us well. In order to support dynamic tasks (i.e., tasks which spawn other tasks), we rely on Dask worker clients. We have also occasionally experimented with Dask Queues to implement more complicated behavior such as future-sharing and resource throttling, but are not currently using them (mainly for design reasons).

Pain points when using Dask

Our biggest pain point in using Dask has largely revolved around the ability (or lackthereof) to share futures between clients. To provide a concrete example, suppose we start with alist of numbers and, using client.maptwice, we proceed to compute x -> x + 1 -> x + 2 for each element of our list. When using only dask primitives and a single client,these computations proceed asychronously, meaning that the final computation of each branchcan begin without waiting on the other middle computations, as in this schematic:

_images/depth-first.pngDepth First Execution

However, in Prefect, we aren’t simply passing around Dask futures created from a single Client - when a map operation occurs, the dask futures are actually created by a workerclient and attached to a Prefect State object._Ideally, we would leave these futures unresolved at this stage so that computation can proceed as above. However, becauseit is non-trivial to share futures between clients we must gather the futures with this same client, makingour computation proceed in a “breadth-first” manner:

_images/breadth-first.pngBreadth first execution

This isn’t the worst thing, but for longer pipelines it would be very nice to have the faster branchesof the pipeline proceed with execution so that final results are produced earlier for inspection.

Technology we use around Dask

Our preferred deployment of Prefect Flows uses dask-kubernetesto spin up a short-lived Dask Cluster in Kubernetes.

Otherwise, the logic contained within Prefect Tasks can be essentially arbitrary;many tasks in the system interact with databases, GCP resources, AWS, etc.