Airflow is an open source tool for creating, scheduling, and monitoring data processing pipelines. Airflow can be installed via
conda install -c conda-forge airflow or
pip install airflow. Before running airflow, we need to initiate the database
Then we can start the airflow webserver, which a python flask app providing the UI of airflow. The default port of the webserver is 8080:
airflow webserver -p 8080. When we open 0.0.0.0:8080 in the browser, we can see a bunch of airflow examples/DAGs.
DAGs (directed acyclic graphs) represent workflows of multiple tasks with some sort of dependencies among them. The Graph View of a DAG usually shows clearly the logics of the workflow.
In this example below, I’m interested in working with a BigQuery public dataset, San Francisco 311 service requests data, and counting how many requests were reported to each agency on a daily basis. Of course, we can run this query directly on BigQuery manually. However, with airflow, we can save the daily count data automatically and never need to worry about it. Here I am saving daily output to a csv file. And there are a lot of things we can do to add onto the daily airflow schedule. Here I am just demonstrating this simple daily aggregation task.
To start, let’s import some useful python libraries.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
import os, errno
from os.path import dirname, exists
from os import listdir, makedirs
Then, we need to decide where we would like all our output files to go. I defined my output paths in a list in the
output_path = ['','Users', 'YOUR USER NAME','airflow','dags','example','output']
One thing I like to do is to send failure messages to slack. Every time when airflow fails to run for a given step, I’d like airflow to send a failure message to slack. Full credit to Christopher Flynn, I adapted his codes to do this step. First, I defined my base url for the web GUI. For local computers, the base url is usually
http://0.0.0.0:8080. If you are running the airflow on a server, you should change the url accordingly. Next, I defined a send_slack function, which includes the information I’d like to send in the slack message. Note that you will need to go to the slack website and get a…