Skip to main content

DAG Runs

Note: This article is for my own reference.

DAG is a Directed Acyclic Graph — a conceptual representation of a series of activities, or, in other words, a mathematical abstraction of a data pipeline.

A DAG (or a pipeline) defines a sequence of execution stages in any non-recurring algorithm.

Various methods can be used to define the interval to run the DAG (Directed Acyclic Graph). However, the most widely used technique is by utilizing a cron expression.

This expression is a string that represents a particular time interval. It comprises five characters separated by spaces, starting from the leftmost character, which describes minutes, hours, the day of the month, the month, and the day of the week.

Explaining each of these in detail is beyond the scope of this article, but there are numerous online resources to learn cron expressions, such as the website Cronitor.

A DAG Run is an object that instantiates the DAG in time.

Each DAG may or may not have a schedule, which informs how DAG Runs are created. schedule_interval is defined as a DAG arguments, and receives preferably a cron expression as a str, or a datetime.timedelta object.

Alternatively, you can also use one of these cron “preset”:

preset meaning cron
None Don’t schedule, use for exclusively
“externally triggered” DAGs
@once Schedule once and only once
@hourly Run once an hour at the beginning of the hour 0 * * * *
@daily Run once a day at midnight 0 0 * * *
@weekly Run once a week at midnight on
Sunday morning
0 0 * * 0
@monthly Run once a month at midnight of the
first day of the month
0 0 1 * *
@yearly Run once a year at midnight of January 1 0 0 1 1 *

The following DAG needs to run on every hour at minute 0. For example, every hour at minute N would be N * * * *.

# Create the DAG object to run on every hour at minute 0

dag = DAG(
    dag_id="car_factory_simulation",
    default_args={"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2)},
    schedule_interval="0 * * * *",
)

Airflow uses a Directed Acyclic Graph (DAG) to represent a pipeline. The graph's nodes indicate the tasks that need to be executed, while the directed connections between the nodes show the dependencies between tasks.

It is logical to depict a data pipeline as a DAG since some tasks must be completed before others can begin.

This is similar to an assembly line in a car manufacturing plant, where tasks accumulate, and each task relies on the completion of previous tasks.

A fictional DAG could look something like this:

graph LR;
    A[assemble_frame]:::foo
    A --> B[fix_tyres]:::bar
    A --> C[assemble_body]:::foobar
    C --> E[apply_paint]:::foo
    classDef foo stroke:#f00
    classDef bar stroke:#0f0
    classDef foobar stroke:#00f

Initially, the frame should be assembled, followed by the body and tires. After completing these steps, the painting process can begin.

Let's replicate the aforementioned steps in the code.

# Create the DAG object
dag = DAG(
    dag_id="car_factory_simulation",
    default_args={"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2)},
    schedule_interval="0 * * * *",  # run on every hour at minute 0
)

# Task definitions
assemble_frame = BashOperator(
    task_id="assemble_frame", bash_command='echo "Assembling frame"', dag=dag
)
fix_tyres = BashOperator(
    task_id="fix_tyres", bash_command='echo "Fixing tires"', dag=dag
)
assemble_body = BashOperator(
    task_id="assemble_body", bash_command='echo "Assembling body"', dag=dag
)
apply_paint = BashOperator(
    task_id="apply_paint", bash_command='echo "Applying paint"', dag=dag
)

# the downstream flow
assemble_frame.set_downstream(fix_tyres)
assemble_frame.set_downstream(assemble_body)
assemble_body.set_downstream(apply_paint)

Post Tags: