Apache Airflow is one of the latest open-source
projects that have aroused great interest in the developer community. So much
so that Google has integrated it in
Google Cloud’s stack as the de facto tool for orchestrating their services.
What makes this project so special and why has it been so well received? In this post we will go over its evolution and discuss its main characteristics.
History
Apache Airflow was created in October 2014 by Maxime Beauchemin
within the data engineering team of Airbnb, the famous vacation rental
platform. Since the moment of its inception it was conceived as open-source software. It was officially
published in June 2015 and made
available to everyone on GitHub.
Airflow was welcomed into the Apache Software Foundation’s incubation
programme in March 2016, thus following in the footsteps of other major
open-source software projects within the data sphere like Hadoop or Spark. Today it is used in production by more than
200 companies around the world, such as Paypal, Spotify and Twitter.
Apache Airflow’s latest big boost has come from Google. In May 2018 Google announced Google Cloud Composer, a managed Apache Airflow service that is fully integrated in the Google Cloud platform and has thus become one of the cornerstones for orchestrating managed services in Google Cloud.
DAGs and Operators
But what is actually Apache Airflow?
It is a platform through which we can
programmatically create workflows and also plan and monitor them in a
centralized manner.
A workflow within Airflow could be defined as a sequence of tasks that are triggered by an event or schedule and is
typically used to handle data pipelines.
An example of a very simple workflow would be the one
made up of the following tasks:
- Download certain data from a data source (e.g. a database).
- Send the data to a processing system (e.g. a Spark cluster).
- Monitor the progress of the processing.
- Generate a report on the results of the processing.
- Email the report.
Airflow models these workflows or sets of tasks as directed acyclic graphs
(DAGs). These graphs have the peculiarity that they fulfil two conditions:
- They are directed: the links between the different
- nodes have a sense.
- They are acyclic: we cannot create cycles and
- therefore return to a node we have already passed through.
This type of graph is also used in other software projects that are very relevant nowadays, such as Spark and Tensorflow, to create their execution models.
Each of the tasks that make up an Airflow DAG is an Operator in Airflow.
Therefore, to define a DAG we need to define all necessary Operators and
establish the relationships and dependencies among them. This can easily be
done with Python.
There are many predefined Operators – although we can expand ours if
necessary. These are some of the most interesting predefined Operators:
- BashOperator: it executes a
- bash command.
- PythonOperator: it invokes a
- Python function.
- EmailOperator: it sends an
- email.
- SimpleHttpOperator: it makes an
- HTTP request.
- **MySqlOperator, SqliteOperator,
- PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc:** they run an
- SQL query.
- Sensor: it waits for
- an amount of time, a file, a database row, an object in S3…
In Airflow’s official documentation there is a lot of information about all the ‘official’ Operators.
A real example
The following would be an example of a very simple DAG:
We have defined three tasks in this workflow:
- The first one – print_date
- – will display the current date. We will refer to it as t1 in the code. Once it has been run, the two other tasks will be
- triggered concurrently:
- ‘sleep’, which will
- sleep for 5 seconds. We will call t2
- in the code, and
- ‘templated’, which will
- display certain information based on a predefined template and we will call t3.
The code that is needed to program this workflow is described below.
First of all, we will make all necessary imports, among them, that of BashOperator, which is the type of Operator that we will use in this example:
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
Secondly, we will define the default arguments that we will use to instantiate the DAG. Here we will configure important aspects, such as the retry policy.
\# These args will get passed on to each operator\# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),\# 'queue': 'bash_queue',\# 'pool': 'backfill',\# 'priority_weight': 10,\# 'end_date': datetime(2016, 1, 1),\# 'wait_for_downstream': False,\# 'dag': dag,\# 'adhoc':False,\# 'sla': timedelta(hours=2),\# 'execution_timeout': timedelta(seconds=300),\# 'on_failure_callback': some_function,\# 'on_success_callback': some_other_function,\# 'on_retry_callback': another_function,\# 'trigger_rule': u'all_success'
}
We will instantiate the DAG by giving it the
default arguments and the execution plan. In this case, once a day.
To schedule the execution we can also use a cron-type notation, which is usually the most convenient:
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
Next, we will define the three Operators. In this case, since they are BashOperator instances, we will link them to our DAG, and define in the bash_command parameter the bash command they must execute:
\# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
dag=dag,
)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
Finally, we will define the dependency or relationship among our operators. In this case, we will use bit shift operators (>>), which come preloaded since v1.8 to simplify the definition of relationships among Operators:
t1 >> [t2, t3]
Interesting features
So far we have described how Airflow works in general and how to program a
DAG. But why is Airflow special and what sets it apart from other tools?
Some of Airflow's most outstanding
features are as follows:
- Managing faults in data sources or sinks: integrated management of errors in the accessing of data. It is possible to define a different behaviour for each case.
- Reprocessing of historical jobs: ordering past jobs to be reprocessed directly from the GUI in a very simple manner.
- Parameters between jobs: swapping parameters between different jobs through a buffer. This allows us to assign statuses to workflows.
- Automatic retries: automatically managing retries based on how we set up each DAG.
- CI and CD support in workflows: easily integrating Airflow with our continuous integration or deployment systems by either using tools such as Jenkins or managing DAGs in GitHub.
- Many integrations: Airflow can be integrated with a great deal of third-party platforms and software via the Operators. Many of them are community contributions: Hive, Presto, Druid, AWS, Google Cloud, Azure, Databricks, Jenkins, Kubernetes, Mongo, Oracle, SSH, etc.
- Data sensors: they are a particular type of operator that allow us to wait until a certain condition is met, eg that a file be written in HDFS or S3.
- Job testing capabilities: it allows us to try out and validate our tests before deployment.
- Logs and metadata from WebGUI: a central point from where we can access all logs and metadata associated with each task from the web interface.
- Task triggers: there are different ways in which we can trigger our workflows to run automatically based on a time schedule; we can also trigger them by hand.
- RT monitoring and alerts: we can monitor the execution status of our workflows in real time from the GUI.
Big community support: Airflow is licensed under Apache 2.0 and has strong support from the community with over 700 contributors and 6000 commits.
Google Cloud Composer
As on previous occasions, Google relies on renowned open-source projects
with a large community and integrates them in its cloud platform as managed
services.
By following this strategy, Google does not have to start from scratch when building services on the cloud. On the other hand, it strengthens and supports these communities by contributing to the development of projects. Other similar cases are Apache Beam and Dataflow or Kubernetes and GKE.
Cloud Composer is nothing but a version of Apache Airflow, but it has certain advantages since it is a
managed service (of course, it also comes with an additional cost). These
are some of the main benefits of using Cloud Composer:
- Simplicity:
- One click to create a new Airflow environment.
- Control integrated with Google Cloud SDK, Google Developer Console.
- ○ Easy access to Airflow Web UI.
- Security:
- Identity access management (IAM): credentials, permissions, policies.
- Scalability:
- Easily scalable on Google Cloud infrastructure.
- Monitoring:
- Logging and monitoring integrated with Stackdriver.
- ○ Python package dependency management (pip).
Total
integration with GCP: Big Data, Machine Learning…
Another interesting aspect is that, with Cloud Composer, our files describing DAGs are stored in Cloud Storage.
To deploy a new DAG, we simply have to upload it to Cloud Storage either via the web interface of the Google Cloud console or by using the tools of Google Cloud SDK (gcloud).
Conclusions
We can wrap things up by saying that Apache
Airflow is a consolidated open-source project that has a big, active community
behind it and the support of major companies such as Airbnb and Google.
Many companies are now using Airflow in production to orchestrate their
data workflows and implement their datum quality and governance policies.
Airflow allows us to govern our data pipelines in a
centralized manner and provides us with a modern system to achieve
homogeneous logging and monitoring.
Thanks to Cloud Composer, we can
have a production Airflow environment up and running in a very fast and easy
way and enjoy all the comforts of a service that is managed and integrated
with the entire Google Cloud platform.
Thus, we will reduce our efforts in infrastructure deployment and
maintenance. Conversely, we will have to deal with the costs associated with
cloud service.
Comments are moderated and will only be visible if they add to the discussion in a constructive way. If you disagree with a point, please, be polite.
Tell us what you think.