DAG Runs
Note: This article is for my own reference.
DAG is a Directed Acyclic Graph — a conceptual representation of a series of activities, or, in other words, a mathematical abstraction of a data pipeline.
A DAG (or a pipeline) defines a sequence of execution stages in any non-recurring algorithm.
Various methods can be used to define the interval to run the DAG (Directed Acyclic Graph). However, the most widely used technique is by utilizing a cron expression.
This expression is a string that represents a particular time interval. It comprises five characters separated by spaces, starting from the leftmost character, which describes minutes, hours, the day of the month, the month, and the day of the week.
Explaining each of these in detail is beyond the scope of this article, but there are numerous online resources to learn cron expressions, such as the website Cronitor.
A DAG Run is an object that instantiates the DAG in time.
Each DAG may or may not have a schedule, which informs how DAG Runs
are created. schedule_interval
is defined as a DAG arguments, and receives preferably a cron expression as a str
, or a datetime.timedelta
object.
Alternatively, you can also use one of these cron “preset”:
preset | meaning | cron |
---|---|---|
None |
Don’t schedule, use for exclusively “externally triggered” DAGs |
|
@once |
Schedule once and only once | |
@hourly |
Run once an hour at the beginning of the hour | 0 * * * * |
@daily |
Run once a day at midnight | 0 0 * * * |
@weekly |
Run once a week at midnight on Sunday morning |
0 0 * * 0 |
@monthly |
Run once a month at midnight of the first day of the month |
0 0 1 * * |
@yearly |
Run once a year at midnight of January 1 | 0 0 1 1 * |
The following DAG needs to run on every hour at minute 0. For example, every hour at minute N would be N * * * *.
# Create the DAG object to run on every hour at minute 0
dag = DAG(
dag_id="car_factory_simulation",
default_args={"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2)},
schedule_interval="0 * * * *",
)
Airflow uses a Directed Acyclic Graph (DAG) to represent a pipeline. The graph's nodes indicate the tasks that need to be executed, while the directed connections between the nodes show the dependencies between tasks.
It is logical to depict a data pipeline as a DAG since some tasks must be completed before others can begin.
This is similar to an assembly line in a car manufacturing plant, where tasks accumulate, and each task relies on the completion of previous tasks.
A fictional DAG could look something like this:
graph LR; A[assemble_frame]:::foo A --> B[fix_tyres]:::bar A --> C[assemble_body]:::foobar C --> E[apply_paint]:::foo classDef foo stroke:#f00 classDef bar stroke:#0f0 classDef foobar stroke:#00f
Initially, the frame should be assembled, followed by the body and tires. After completing these steps, the painting process can begin.
Let's replicate the aforementioned steps in the code.
# Create the DAG object
dag = DAG(
dag_id="car_factory_simulation",
default_args={"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2)},
schedule_interval="0 * * * *", # run on every hour at minute 0
)
# Task definitions
assemble_frame = BashOperator(
task_id="assemble_frame", bash_command='echo "Assembling frame"', dag=dag
)
fix_tyres = BashOperator(
task_id="fix_tyres", bash_command='echo "Fixing tires"', dag=dag
)
assemble_body = BashOperator(
task_id="assemble_body", bash_command='echo "Assembling body"', dag=dag
)
apply_paint = BashOperator(
task_id="apply_paint", bash_command='echo "Applying paint"', dag=dag
)
# the downstream flow
assemble_frame.set_downstream(fix_tyres)
assemble_frame.set_downstream(assemble_body)
assemble_body.set_downstream(apply_paint)
Post Tags: