Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions chapters/chapter06/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Chapter 6

Code accompanying Chapter 4 of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow).

## Contents

This folder contains DAGs from Chapter 6.

## Usage

To get started with the code examples, start Airflow with Docker Compose with the following command:

```bash
docker-compose up -d
```

The webserver initializes a few things, so wait for a few seconds, and you should be able to access the
Airflow webserver at http://localhost:8080.

To stop running the examples, run the following command:

```bash
docker-compose down -v
```
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
dag_id="chapter6_couponing_app_figure612",
dag_id="figure_6_12",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
description="A batch workflow for ingesting supermarket promotions data.",
Expand Down
109 changes: 109 additions & 0 deletions chapters/chapter06/dags/figure_6_17.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

# ================================================ EXAMPLE 1 =================================================

example_1_dag_1 = DAG(
dag_id="figure_6_17_example_1_dag_1",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 0 * * *",
)
example_1_dag_2 = DAG(
dag_id="figure_6_17_example_1_dag_2",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)

DummyOperator(task_id="etl", dag=example_1_dag_1) >> TriggerDagRunOperator(
task_id="trigger_dag2",
trigger_dag_id="figure_6_17_example_1_dag_2",
dag=example_1_dag_1,
)
PythonOperator(
task_id="report", dag=example_1_dag_2, python_callable=lambda: print("hello")
)

# ================================================ EXAMPLE 2 =================================================

example_2_dag_1 = DAG(
dag_id="figure_6_17_example_2_dag_1",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 0 * * *",
)
example_2_dag_2 = DAG(
dag_id="figure_6_17_example_2_dag_2",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 0 * * *",
)
example_2_dag_3 = DAG(
dag_id="figure_6_17_example_2_dag_3",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 0 * * *",
)
example_2_dag_4 = DAG(
dag_id="figure_6_17_example_2_dag_4",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)

for dag_ in [example_2_dag_1, example_2_dag_2, example_2_dag_3]:
DummyOperator(task_id="etl", dag=dag_) >> TriggerDagRunOperator(
task_id="trigger_dag4", trigger_dag_id="figure_6_17_example_2_dag_4", dag=dag_
)

PythonOperator(
task_id="report", dag=example_2_dag_4, python_callable=lambda: print("hello")
)

# ================================================ EXAMPLE 3 =================================================

example_3_dag_1 = DAG(
dag_id="figure_6_17_example_3_dag_1",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 0 * * *",
)
example_3_dag_2 = DAG(
dag_id="figure_6_17_example_3_dag_2",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)
example_3_dag_3 = DAG(
dag_id="figure_6_17_example_3_dag_3",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)
example_3_dag_4 = DAG(
dag_id="figure_6_17_example_3_dag_4",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)

DummyOperator(task_id="etl", dag=example_3_dag_1) >> [
TriggerDagRunOperator(
task_id="trigger_dag2",
trigger_dag_id="figure_6_17_example_3_dag_2",
dag=example_3_dag_1,
),
TriggerDagRunOperator(
task_id="trigger_dag3",
trigger_dag_id="figure_6_17_example_3_dag_3",
dag=example_3_dag_1,
),
TriggerDagRunOperator(
task_id="trigger_dag4",
trigger_dag_id="figure_6_17_example_3_dag_4",
dag=example_3_dag_1,
),
]
PythonOperator(
task_id="report", dag=example_3_dag_2, python_callable=lambda: print("hello")
)
PythonOperator(
task_id="report", dag=example_3_dag_3, python_callable=lambda: print("hello")
)
PythonOperator(
task_id="report", dag=example_3_dag_4, python_callable=lambda: print("hello")
)
54 changes: 54 additions & 0 deletions chapters/chapter06/dags/figure_6_19.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.utils.state import State

dag1 = DAG(
dag_id="figure_6_19_dag_1",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 0 * * *",
)
dag2 = DAG(
dag_id="figure_6_19_dag_2",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 0 * * *",
)
dag3 = DAG(
dag_id="figure_6_19_dag_3",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 0 * * *",
)
dag4 = DAG(
dag_id="figure_6_19_dag_4",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)

DummyOperator(task_id="etl", dag=dag1)
DummyOperator(task_id="etl", dag=dag2)
DummyOperator(task_id="etl", dag=dag3)
[
ExternalTaskSensor(
task_id="wait_for_etl_dag1",
external_dag_id="figure_6_19_dag_1",
external_task_id="etl",
dag=dag4,
allowed_states=State.task_states,
),
ExternalTaskSensor(
task_id="wait_for_etl_dag2",
external_dag_id="figure_6_19_dag_2",
external_task_id="etl",
dag=dag4,
allowed_states=State.task_states,
),
ExternalTaskSensor(
task_id="wait_for_etl_dag3",
external_dag_id="figure_6_19_dag_3",
external_task_id="etl",
dag=dag4,
allowed_states=State.task_states,
),
] >> PythonOperator(task_id="report", dag=dag4, python_callable=lambda: print("hello"))
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from airflow.sensors.external_task_sensor import ExternalTaskSensor

dag1 = DAG(
dag_id="chapter6_ingest_supermarket_data",
dag_id="figure_6_20_dag_1",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
)
dag2 = DAG(
dag_id="chapter6_create_metrics",
dag_id="figure_6_20_dag_2",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 18 * * *",
)
Expand All @@ -22,7 +22,7 @@

wait = ExternalTaskSensor(
task_id="wait_for_process_supermarket",
external_dag_id="chapter6_ingest_supermarket_data",
external_dag_id="figure_6_20_dag_1",
external_task_id="process_supermarket",
execution_delta=datetime.timedelta(hours=6),
dag=dag2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
dag_id="chapter6_couponing_app_filesensor",
dag_id="figure_6_5",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
description="A batch workflow for ingesting supermarket promotions data, demonstrating the FileSensor.",
Expand Down
36 changes: 36 additions & 0 deletions chapters/chapter06/dags/figure_6_6.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from pathlib import Path

import airflow.utils.dates
from airflow import DAG
from airflow.contrib.sensors.python_sensor import PythonSensor
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
dag_id="figure_6_6",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
description="A batch workflow for ingesting supermarket promotions data, demonstrating the PythonSensor.",
default_args={"depends_on_past": True},
)

create_metrics = DummyOperator(task_id="create_metrics", dag=dag)


def _wait_for_supermarket(supermarket_id_):
supermarket_path = Path("/data/" + supermarket_id_)
data_files = supermarket_path.glob("data-*.csv")
success_file = supermarket_path / "_SUCCESS"
return data_files and success_file.exists()


for supermarket_id in range(1, 5):
wait = PythonSensor(
task_id=f"wait_for_supermarket_{supermarket_id}",
python_callable=_wait_for_supermarket,
op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"},
timeout=600,
dag=dag,
)
copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag)
process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag)
wait >> copy >> process >> create_metrics
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from datetime import datetime
from pathlib import Path

import airflow.utils.dates
from airflow import DAG
from airflow.contrib.sensors.python_sensor import PythonSensor
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
dag_id="chapter6_couponing_app_pythonsensor",
start_date=datetime(2019, 1, 1),
schedule_interval="*/2 * * * *",
concurrency=16,
dag_id="figure_6_9",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
description="A batch workflow for ingesting supermarket promotions data, demonstrating the PythonSensor.",
default_args={"depends_on_past": True},
)

create_metrics = DummyOperator(task_id="create_metrics", dag=dag)
Expand Down
15 changes: 15 additions & 0 deletions chapters/chapter06/dags/listing_6_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import airflow.utils.dates
from airflow import DAG
from airflow.contrib.sensors.file_sensor import FileSensor

dag = DAG(
dag_id="listing_6_1",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
description="A batch workflow for ingesting supermarket promotions data, demonstrating the FileSensor.",
default_args={"depends_on_past": True},
)

wait = FileSensor(
task_id="wait_for_supermarket_1", filepath="/data/supermarket1/data.csv", dag=dag
)
28 changes: 28 additions & 0 deletions chapters/chapter06/dags/listing_6_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from pathlib import Path

import airflow.utils.dates
from airflow import DAG
from airflow.contrib.sensors.python_sensor import PythonSensor

dag = DAG(
dag_id="listing_6_2",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
description="A batch workflow for ingesting supermarket promotions data.",
default_args={"depends_on_past": True},
)


def _wait_for_supermarket(supermarket_id_):
supermarket_path = Path("/data/" + supermarket_id_)
data_files = supermarket_path.glob("data-*.csv")
success_file = supermarket_path / "_SUCCESS"
return data_files and success_file.exists()


wait_for_supermarket_1 = PythonSensor(
task_id="wait_for_supermarket_1",
python_callable=_wait_for_supermarket,
op_kwargs={"supermarket_id": "supermarket1"},
dag=dag,
)
12 changes: 12 additions & 0 deletions chapters/chapter06/dags/listing_6_3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
dag_id="listing_6_3",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
concurrency=50,
)

DummyOperator(task_id="dummy", dag=dag)
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
from airflow.operators.dummy_operator import DummyOperator

dag1 = DAG(
dag_id="chapter6_ingest_supermarket_data_triggerdagrunoperator",
dag_id="listing_6_4_dag1",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
)
dag2 = DAG(
dag_id="chapter6_ingest_supermarket_data_triggerdagrunoperator_target",
dag_id="listing_6_4_dag2",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)
Expand All @@ -36,7 +36,7 @@ def _wait_for_supermarket(supermarket_id_):
process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag1)
trigger_create_metrics_dag = TriggerDagRunOperator(
task_id=f"trigger_create_metrics_dag_supermarket_{supermarket_id}",
trigger_dag_id="create_metrics",
trigger_dag_id="listing_6_4_dag2",
dag=dag1,
)
wait >> copy >> process >> trigger_create_metrics_dag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter6_print_dag_run_conf",
dag_id="listing_6_8",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)
Expand Down
Loading