December 17, 2020 | 13min read
Apache Airflow 2.0 Tutorial
Apache Airflow is already a commonly used tool for scheduling data pipelines. But the upcoming Airflow 2.0 is going to be a bigger thing as it implements many new features.
This tutorial provides a step-by-step guide through all crucial concepts of Airflow 2.0 and possible use cases.
Apache Airflow is an open-source platform to run any type of workflow. And by any we mean…any! Airflow is using the Python programming language to define the pipelines. Users can take full advantage of that by using for loop to define pipelines, executing bash commands, using any external modules like pandas, sklearn or GCP or AWS libraries to manage cloud services and much, much more.
Airflow is a reliable solution which was trusted by many companies. Pinterest used to face some performance and scalability issues and deal with high maintenance costs. GoDaddy has many batch analytics and data teams that need an orchestration tool and readymade operators for building ETL pipelines. DXC Technology delivered a client’s project that required massive data storage, hence needed a stable orchestration engine. All these challenges have been worked out by implementing the right deployment of Airflow.
The versatility of Airflow allows you to use it to schedule any type of workflows. Apache Airflow can run ad hoc workloads that are not related to any schedule or interval. However, it works best for pipelines:
- that change slowly
- related to the time interval
- scheduled on time
By “changing slowly” we mean that the pipeline, once deployed, is expected to differ from time to time (days/weeks rather than hours or minutes). It is connected to a lack of Airflow pipelines’ versioning. The” related to the time interval” means that the Airflow is best suited for processing data intervals. That’s also why Airflow works best when pipelines are scheduled to run on specific time. Although it is possible to trigger the pipelines manually or using external triggers (for example via REST API).
Apache Airflow can be used to schedule:
- ETL pipelines that extract data from multiple sources and run Spark jobs or any other data transformations
- Training machine learning models
- Report generation
- Backups and similar DevOps operations
And much more! You can even write a pipeline to brew coffee every few hours, it will need some custom integrations but that’s the biggest power of Airflow - it’s pure Python and everything can be programmed.
If you would like to learn more about Airflow use cases, check the following Airflow Summit videos:
- Keynote: How large companies use Airflow for ML and ETL pipelines
- Data flow with Airflow @ PayPal
- Scheduler as a service - Apache Airflow at EA Digital Platform
- Airflow at Société Générale : An open source orchestration solution in a banking environment
Let’s start with a few base concepts of Airflow!
Workflows are defined in Airflow by DAGs (Directed Acyclic Graphs) and are nothing more than a python file. A single DAG file may contain multiple DAG definitions, although it is recommended to keep one DAG per file.
Let’s take a look at example DAG:
from airflow.models import DAG from airflow.utils.dates import days_ago with DAG( "etl_sales_daily", start_date=days_ago(1), schedule_interval=None, ) as dag: ...
First of all, DAG is identified by unique
dag_id which has to be unique in whole Airflow deployment. Additionally, to create a DAG we need to specify:
schedule_interval—which defines when the DAG should be run. It can be
timedeltaobject for example
timedelta(days=2)or a string cron expression
* * * * *. It can be
Noneand then the DAG will not be scheduled by Airflow but it can still be triggered manually or externally.
start_date- a date (
datetimeobject) from which the DAG will start running. This helps run a DAG for past dates. It is common to use the
days_agofunction to specify this value. If the date is in the future you can still trigger the dag manually.
Once we have this baseline, we can start adding tasks to our DAG:
from airflow.models import DAG from airflow.utils.dates import days_ago from airflow.operators.dummy_operator import DummyOperator with DAG( "etl_sales_daily", start_date=days_ago(1), schedule_interval=None, ) as dag: task_a = DummyOperator(task_id="task_a") task_b = DummyOperator(task_id="task_b") task_c = DummyOperator(task_id="task_c") task_d = DummyOperator(task_id="task_d") task_a >> [task_b, task_c] task_c >> task_d
Every task in a Airflow DAG is defined by the operator (we will dive into more details soon) and has its own
task_id that has to be unique within a DAG. Each task has a set of dependencies that define its relationships to other tasks. These include:
- Upstream tasks—a set of tasks that will be executed before this particular task.
- Downstream tasks—set of tasks that will be executed after this task.
In our example
task_c are downstream of
task_a. And respectively
task_a is in upstream of both
task_c. A common way of specifying a relation between tasks is using the
>> operator which works for tasks and collection of tasks (for example list or sets).
This is how a graphical representation of this DAG looks like:
Additionally, each task can specify
trigger_rule which allows users to make the relations between tasks even more complex. Examples of trigger rules are:
all_success—meaning that all tasks in upstream of a task have to succeed before Airflow attempts to execute this task
one_success— one succeeded task in upstream is enough to trigger a task with this rule
none_failed— each task in upstream has to either succeed or be skipped, no failed tasks are allowed to trigger this task
All of this allows users to define arbitrarily complex workflows in a really simple way.
Code used to generate this DAG is available on gist
As mentioned already, each task in Airflow DAG is defined by an operator. Every operator is a pythonic class that implements the
execute method that encapsulates the whole logic of what is executed. Operators can be split into three categories:
- Action operators—for example, BashOperator (executes any bash command), PythonOperator (executes a python function) or TriggerDagRunOperator (triggers another DAG).
- Transfer operators—designed to transfer data from one place to another, for example GCSToGCSOperator which copies data from one Google Cloud Storage bucket to another one. Those operators are a separate group because they are often stateful (the data is first downloaded from source storage and stored locally on a machine running Airflow and then uploaded to destination storage).
- Sensors—are operators classes inheriting from
BaseSensorOperatorand are designed to wait for an operation to complete. When implementing a sensor, users have to implement the
pokemethod which is then invoked by a special
pokemethod should return True or False. The sensor will be executed by Airflow until it returns True.
Airflow ships with few built-in operators like the mentioned
BashOperator. However, users can easily install additional operators using providers packages. In this way Airflow makes it easy to use hundreds of operators that allow users to easily integrate with external services like Google Cloud Platform, Amazon Web Services or common data processing tools like Apache Spark.
Next to operators Airflow has a concept of hooks. While operators are designed to provide a reusable way of defining tasks and authoring DAGs, hooks are designed to make creating new operators easy.
Main purpose of hooks is to implement the communication layer with external service together with common methods for authentication and error handling.
For example let’s consider that we want to integrate with Google Cloud Storage and we want to create an operator to make buckets. First we should implement a GCSHook which implements a
from airflow.hooks.base_hook import BaseHook class GCSHook(BaseHook): def __init__(self, *args, **kwargs): super().__init__() def create_bucket(self, bucket_name: str): # Here goes logic for creating GCS buckets ...
Then we create the GCSCreateBucketOperator which creates instance of GCSHook in
execute method and calls proper method:
from airflow.models.baseoperator import BaseOperator class GCSCreateBucketOperator(BaseOperator): def __init__(self, *, bucket_name: str, **kwargs): super().__init__(**kwargs) self.bucket_name = bucket_name def execute(self, context): hook = GCSHook() hook.create_bucket(self.bucket_name)
By doing this we achieve great reusability. First, the new hook can be reused in other operators (for example in transfer operators). Then, the operator can use hook’s method but also add additional logic which is not strictly related to creating a bucket like handling situations when the bucket already exists.
In general hooks’ methods should provide the smallest possible building blocks from which we can build more complex logic which is encapsulated in operator.
While hooks’ purpose is to implement communication layer with external services, the XCom purpose is to implement communication mechanism that allows information passing between tasks in DAG.
The fundamental part of XCom is the underlying metadatabase table (with same name) which works as a key-value storage. The key consists is a tuple (
key) where the
key attribute is a configurable string (by default it’s
return_value). The stored value has to be json serializable and relatively small (max 48KB is allowed).
This means that the XCom purpose is to store metadata not the data. For example, if we have a dag with
task_a >> task_b and a big data frame has to be passed from
task_b then we have to store it somewhere in a persistent place (storage bucket, database etc) between those tasks. Then
task_a should upload the data to storage and write to the XCom table an information where this data can be found, for example a uri to storage bucket or name of a temporary table. Once this information is in the XCom table, the
task_b can access this value and retrieve the data from external storage.
In many cases this may sound like a lot of additional logic of uploading and downloading the data in operators. That’s true. But first of all, that’s where hooks came to the rescue—you can reuse logic for storing data in many different places. Second, there’s a possibility to specify a custom XCom backend. In this way users can simply write a class that will define how to serialize data before it’s stored in the XCom table and how to deserialize it when it’s retrieved from metadatabase. This, for example, allows users to automate the logic of persisting data frames as we described in this [article](link to dag authoring).
Although Airflow can be run on a single machine it is fully designed to be deployed in a distributed manner. This is because Airflow consist of separate parts:
- Scheduler—this is the brain and heart of Airflow. Scheduler is responsible for parsing DAG files, managing database state and - as the name suggests - for scheduling tasks. Since Airflow 2.0 users can run multiple schedulers to ensure high availability of this crucial component.
- Webserver—the web interface of Airflow which allows to manage and monitor DAGs.
- Worker—a Celery worker application which consumes and executes tasks scheduled by scheduler when using a Celery-like executor (more details in next section). It is possible to have many workers in different places (for example using separate VMs or multiple kubernetes pods).
All those components can be started using the
airflow command. And all of them require access to the Airflow metadatabase which is used to store all information about DAGs and tasks. Additionally both scheduler and worker need access to exactly the same DAG files. That’s why we will shortly discuss DAG distribution approaches later on but first let’s learn about Airflow’s executors.
Executor is one of the crucial components of Airflow and it can be configured by the users. It defines where and how the Airflow tasks should be executed. The executor should be chosen to fit your needs and as it defines many aspects of how Airflow should be deployed.
Currently Airflow supports following executors:
LocalExecutor—executes the tasks in separate processes on a single machine. It’s the only non-distributed executor which is production ready. It works well in relatively small deployments.
CeleryExecutor—the most popular production executor, which uses under the hood the Celery queue system. When using this executor users can deploy multiple workers that read tasks from the broker queue (Redis or RabbitMQ) where tasks are sent by scheduler. This executor can be distributed between many machines and users can take advantage of queues that allow them to specify what task should be executed where. This is for example useful for routing compute-heavy tasks to more resourceful workers.
KubernetesExecutor— is another widely used production-ready executor. As the name suggests it requires a Kubernetes cluster. When using this executor Airflow will spawn a new pod with an Airflow instance to run each task. This creates an additional overhead which can be problematic for short running tasks.
CeleryKubernetsExecutor— the name says it all, this executor uses both CeleryExecutor and KubernetesExecutor. When users select this executor they can use a special
kubernetesqueue to specify that particular tasks have to be executed using KubernetesExecutor. Otherwise tasks are routed to celery workers. In this way users can take full advantage of horizontal auto scaling of worker pods and possibility of delegating longrunning / compute heavy tasks to
DebugExecutor—this is a debug executor. Its main purpose is to debug DAG locally. It’s the only executor that uses a single process to execute all tasks. By doing so it’s simple to use it from IDE level as described in docs.
One of the easiest ways to start a journey with Airflow is using the official Apache Airflow docker image. This will allow you to play with Airflow locally and find out more about it. It also simplifies deploying Airflow. You can either use vanilla image or bake your own customized one by:
- using Airflow image as a base image in your Dockerfile,
- using the Airflow Breeze tool to build optimized custom image, check documentation to learn about customizing production deployments.
The Docker image serves as a base for production grade Helm chart which allows you to deploy Airflow on Kubernetes cluster. It deploys every component of Airflow together with out of the box support for horizontal auto scaling of workers using KEDA. So, if you are planning to use Kubernetes to deploy Airflow then this is the recommended way to go!
When we discussed Airflow as distributed system we underlined that both scheduler and worker needs access to the DAG files. Scheduler needs them to create database representation of DAG and schedule them. Workers on the other hand need to read a DAG to execute it.
It’s definitely inconvenient to upload DAGs manually to your deployment everytime there’s a change in their definitions. That’s why you need to design a DAG distribution process.
It usually starts with a repository where you store the DAGs. In this way you have a history of changes and cooperation between team members is easy. Then you take advantage of the CI/CD system to upload the DAGs to your deployment.
One possibility may be to use a shared file system (for example AWS Elastic File System) which can be accessed by every Airflow component. While this can work for small deployments, this type of DAG distribution is known to impact performance of Airflow (DAG parsing times, inter task latency).
The other—recommended—approach is to deliver DAGs to each component’s volumes. This can be achieved using tools like git-sync which synchronizes local repositories with remote one. Or by using solution like gcs rsyns or gcs fuse.
Another way to distribute the DAG is to bake them into your custom Docker image. However, this implies that you need to rebuild it for every DAG change (or on schedule) and you need to be able to easily change the image used by each of the Airflow components.
We have more useful content about Airflow 2.0.! For example, check out this article about Airflow providers.