engineering

December 17, 2020   |   13min read

Apache Airflow 2.0 Tutorial

Apache Airflow is already a commonly used tool for scheduling data pipelines. But the upcoming Airflow 2.0 is going to be a bigger thing as it implements many new features.

A comics showing a story of engineers getting confused with data. Image source

This tutorial provides a step-by-step guide through all crucial concepts of Airflow 2.0 and possible use cases.

Airflow tutorial—overview

Apache Airflow is an open-source platform to run any type of workflow. And by any we mean…any! Airflow is using the Python programming language to define the pipelines. Users can take full advantage of that by using for loop to define pipelines, executing bash commands, using any external modules like pandas, sklearn or GCP or AWS libraries to manage cloud services and much, much more.

Airflow is a reliable solution which was trusted by many companies. Pinterest used to face some performance and scalability issues and deal with high maintenance costs. GoDaddy has many batch analytics and data teams that need an orchestration tool and readymade operators for building ETL pipelines. DXC Technology delivered a client’s project that required massive data storage, hence needed a stable orchestration engine. All these challenges have been worked out by implementing the right deployment of Airflow.

Apache Airflow use cases

The versatility of Airflow allows you to use it to schedule any type of workflows. Apache Airflow can run ad hoc workloads that are not related to any schedule or interval. However, it works best for pipelines:

  • that change slowly
  • related to the time interval
  • scheduled on time

By “changing slowly” we mean that the pipeline, once deployed, is expected to differ from time to time (days/weeks rather than hours or minutes). It is connected to a lack of Airflow pipelines’ versioning. The” related to the time interval” means that the Airflow is best suited for processing data intervals. That’s also why Airflow works best when pipelines are scheduled to run on specific time. Although it is possible to trigger the pipelines manually or using external triggers (for example via REST API).

Apache Airflow can be used to schedule:

  • ETL pipelines that extract data from multiple sources and run Spark jobs or any other data transformations
  • Training machine learning models
  • Report generation
  • Backups and similar DevOps operations

And much more! You can even write a pipeline to brew coffee every few hours, it will need some custom integrations but that’s the biggest power of Airflow - it’s pure Python and everything can be programmed.

If you would like to learn more about Airflow use cases, check the following Airflow Summit videos:

Core concepts of Apache Airflow 2.0

Let’s start with a few base concepts of Airflow!

Airflow DAG

Workflows are defined in Airflow by DAGs (Directed Acyclic Graphs) and are nothing more than a python file. A single DAG file may contain multiple DAG definitions, although it is recommended to keep one DAG per file.

Let’s take a look at example DAG:

from airflow.models import DAG
from airflow.utils.dates import days_ago

with DAG(
   "etl_sales_daily",
   start_date=days_ago(1),
   schedule_interval=None,
) as dag:
   ...

First of all, DAG is identified by unique dag_id which has to be unique in whole Airflow deployment. Additionally, to create a DAG we need to specify:

  • schedule_interval—which defines when the DAG should be run. It can be timedelta object for example timedelta(days=2) or a string cron expression * * * * *. It can be None and then the DAG will not be scheduled by Airflow but it can still be triggered manually or externally.
  • start_date - a date (datetime object) from which the DAG will start running. This helps run a DAG for past dates. It is common to use the days_ago function to specify this value. If the date is in the future you can still trigger the dag manually.

Once we have this baseline, we can start adding tasks to our DAG:

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator

with DAG(
  "etl_sales_daily",
  start_date=days_ago(1),
  schedule_interval=None,
) as dag:
   task_a = DummyOperator(task_id="task_a")
   task_b = DummyOperator(task_id="task_b")
   task_c = DummyOperator(task_id="task_c")
   task_d = DummyOperator(task_id="task_d")

   task_a >> [task_b, task_c]
   task_c >> task_d

Every task in a Airflow DAG is defined by the operator (we will dive into more details soon) and has its own task_id that has to be unique within a DAG. Each task has a set of dependencies that define its relationships to other tasks. These include:

  • Upstream tasks—a set of tasks that will be executed before this particular task.
  • Downstream tasks—set of tasks that will be executed after this task.

In our example task_b and task_c are downstream of task_a. And respectively task_a is in upstream of both task_b and task_c. A common way of specifying a relation between tasks is using the >> operator which works for tasks and collection of tasks (for example list or sets).

This is how a graphical representation of this DAG looks like:

A graphical representation of what this DAG looks like.

Additionally, each task can specify trigger_rule which allows users to make the relations between tasks even more complex. Examples of trigger rules are:

  • all_success—meaning that all tasks in upstream of a task have to succeed before Airflow attempts to execute this task
  • one_success— one succeeded task in upstream is enough to trigger a task with this rule
  • none_failed— each task in upstream has to either succeed or be skipped, no failed tasks are allowed to trigger this task

All of this allows users to define arbitrarily complex workflows in a really simple way.

How to define arbitrarily complex workflows. Code used to generate this DAG is available on gist

Airflow operators, sensors and hooks

As mentioned already, each task in Airflow DAG is defined by an operator. Every operator is a pythonic class that implements the execute method that encapsulates the whole logic of what is executed. Operators can be split into three categories:

  • Action operators—for example, BashOperator (executes any bash command), PythonOperator (executes a python function) or TriggerDagRunOperator (triggers another DAG).
  • Transfer operators—designed to transfer data from one place to another, for example GCSToGCSOperator which copies data from one Google Cloud Storage bucket to another one. Those operators are a separate group because they are often stateful (the data is first downloaded from source storage and stored locally on a machine running Airflow and then uploaded to destination storage).
  • Sensors—are operators classes inheriting from BaseSensorOperator and are designed to wait for an operation to complete. When implementing a sensor, users have to implement the poke method which is then invoked by a special execute method of BaseSensorOperator. The poke method should return True or False. The sensor will be executed by Airflow until it returns True.

Airflow ships with few built-in operators like the mentioned BashOperator. However, users can easily install additional operators using providers packages. In this way Airflow makes it easy to use hundreds of operators that allow users to easily integrate with external services like Google Cloud Platform, Amazon Web Services or common data processing tools like Apache Spark.

Next to operators Airflow has a concept of hooks. While operators are designed to provide a reusable way of defining tasks and authoring DAGs, hooks are designed to make creating new operators easy.

Main purpose of hooks is to implement the communication layer with external service together with common methods for authentication and error handling.

For example let’s consider that we want to integrate with Google Cloud Storage and we want to create an operator to make buckets. First we should implement a GCSHook which implements a create_bucket method:

from airflow.hooks.base_hook import BaseHook

class GCSHook(BaseHook):
   def __init__(self, *args, **kwargs):
       super().__init__()
  
   def create_bucket(self, bucket_name: str):
       # Here goes logic for creating GCS buckets
       ...

Then we create the GCSCreateBucketOperator which creates instance of GCSHook in execute method and calls proper method:

from airflow.models.baseoperator import BaseOperator

class GCSCreateBucketOperator(BaseOperator):
   def __init__(self, *, bucket_name: str, **kwargs):
       super().__init__(**kwargs)
       self.bucket_name = bucket_name

   def execute(self, context):
       hook = GCSHook()
       hook.create_bucket(self.bucket_name)

By doing this we achieve great reusability. First, the new hook can be reused in other operators (for example in transfer operators). Then, the operator can use hook’s method but also add additional logic which is not strictly related to creating a bucket like handling situations when the bucket already exists.

In general hooks’ methods should provide the smallest possible building blocks from which we can build more complex logic which is encapsulated in operator.

Check out some tips and tricks on defining Airflow DAGs.

XCom

While hooks’ purpose is to implement communication layer with external services, the XCom purpose is to implement communication mechanism that allows information passing between tasks in DAG.

The fundamental part of XCom is the underlying metadatabase table (with same name) which works as a key-value storage. The key consists is a tuple (dag_id, task_id, execution_date, key) where the key attribute is a configurable string (by default it’s return_value). The stored value has to be json serializable and relatively small (max 48KB is allowed).

This means that the XCom purpose is to store metadata not the data. For example, if we have a dag with task_a >> task_b and a big data frame has to be passed from task_a to task_b then we have to store it somewhere in a persistent place (storage bucket, database etc) between those tasks. Then task_a should upload the data to storage and write to the XCom table an information where this data can be found, for example a uri to storage bucket or name of a temporary table. Once this information is in the XCom table, the task_b can access this value and retrieve the data from external storage.

In many cases this may sound like a lot of additional logic of uploading and downloading the data in operators. That’s true. But first of all, that’s where hooks came to the rescue—you can reuse logic for storing data in many different places. Second, there’s a possibility to specify a custom XCom backend. In this way users can simply write a class that will define how to serialize data before it’s stored in the XCom table and how to deserialize it when it’s retrieved from metadatabase. This, for example, allows users to automate the logic of persisting data frames as we described in this [article](link to dag authoring).

Deploying Airflow

Airflow as a distributed system

Although Airflow can be run on a single machine it is fully designed to be deployed in a distributed manner. This is because Airflow consist of separate parts:

  • Scheduler—this is the brain and heart of Airflow. Scheduler is responsible for parsing DAG files, managing database state and - as the name suggests - for scheduling tasks. Since Airflow 2.0 users can run multiple schedulers to ensure high availability of this crucial component.
  • Webserver—the web interface of Airflow which allows to manage and monitor DAGs.

DAGs screen

  • Worker—a Celery worker application which consumes and executes tasks scheduled by scheduler when using a Celery-like executor (more details in next section). It is possible to have many workers in different places (for example using separate VMs or multiple kubernetes pods).

All those components can be started using the airflow command. And all of them require access to the Airflow metadatabase which is used to store all information about DAGs and tasks. Additionally both scheduler and worker need access to exactly the same DAG files. That’s why we will shortly discuss DAG distribution approaches later on but first let’s learn about Airflow’s executors.

Executors

Executor is one of the crucial components of Airflow and it can be configured by the users. It defines where and how the Airflow tasks should be executed. The executor should be chosen to fit your needs and as it defines many aspects of how Airflow should be deployed.

Currently Airflow supports following executors:

  • LocalExecutor—executes the tasks in separate processes on a single machine. It’s the only non-distributed executor which is production ready. It works well in relatively small deployments.
  • CeleryExecutor—the most popular production executor, which uses under the hood the Celery queue system. When using this executor users can deploy multiple workers that read tasks from the broker queue (Redis or RabbitMQ) where tasks are sent by scheduler. This executor can be distributed between many machines and users can take advantage of queues that allow them to specify what task should be executed where. This is for example useful for routing compute-heavy tasks to more resourceful workers.
  • KubernetesExecutor— is another widely used production-ready executor. As the name suggests it requires a Kubernetes cluster. When using this executor Airflow will spawn a new pod with an Airflow instance to run each task. This creates an additional overhead which can be problematic for short running tasks.
  • CeleryKubernetsExecutor— the name says it all, this executor uses both CeleryExecutor and KubernetesExecutor. When users select this executor they can use a special kubernetes queue to specify that particular tasks have to be executed using KubernetesExecutor. Otherwise tasks are routed to celery workers. In this way users can take full advantage of horizontal auto scaling of worker pods and possibility of delegating longrunning / compute heavy tasks to KubernetesExecutor.
  • DebugExecutor—this is a debug executor. Its main purpose is to debug DAG locally. It’s the only executor that uses a single process to execute all tasks. By doing so it’s simple to use it from IDE level as described in docs.

Docker image and helm chart

One of the easiest ways to start a journey with Airflow is using the official Apache Airflow docker image. This will allow you to play with Airflow locally and find out more about it. It also simplifies deploying Airflow. You can either use vanilla image or bake your own customized one by:

  • using Airflow image as a base image in your Dockerfile,
  • using the Airflow Breeze tool to build optimized custom image, check documentation to learn about customizing production deployments.

The Docker image serves as a base for production grade Helm chart which allows you to deploy Airflow on Kubernetes cluster. It deploys every component of Airflow together with out of the box support for horizontal auto scaling of workers using KEDA. So, if you are planning to use Kubernetes to deploy Airflow then this is the recommended way to go!

Airflow DAG distribution

When we discussed Airflow as distributed system we underlined that both scheduler and worker needs access to the DAG files. Scheduler needs them to create database representation of DAG and schedule them. Workers on the other hand need to read a DAG to execute it.

It’s definitely inconvenient to upload DAGs manually to your deployment everytime there’s a change in their definitions. That’s why you need to design a DAG distribution process.

It usually starts with a repository where you store the DAGs. In this way you have a history of changes and cooperation between team members is easy. Then you take advantage of the CI/CD system to upload the DAGs to your deployment.

One possibility may be to use a shared file system (for example AWS Elastic File System) which can be accessed by every Airflow component. While this can work for small deployments, this type of DAG distribution is known to impact performance of Airflow (DAG parsing times, inter task latency).

The other—recommended—approach is to deliver DAGs to each component’s volumes. This can be achieved using tools like git-sync which synchronizes local repositories with remote one. Or by using solution like gcs rsyns or gcs fuse.

Another way to distribute the DAG is to bake them into your custom Docker image. However, this implies that you need to rebuild it for every DAG change (or on schedule) and you need to be able to easily change the image used by each of the Airflow components.

We have more useful content about Airflow 2.0.! For example, check out this article about Airflow providers.

Tomek Urbaszek

Software Engineer

Did you enjoy the read?

If you have any questions, don’t hesitate to ask!