Skip to content

Commit a3b5b79

Browse files
authored
Merge pull request #31 from BasPH/feature/fix-ch6
Clean up Chapter 6
2 parents ffe0e81 + daeadb0 commit a3b5b79

20 files changed

+352
-233
lines changed

chapters/chapter06/README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Chapter 6
2+
3+
Code accompanying Chapter 4 of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow).
4+
5+
## Contents
6+
7+
This folder contains DAGs from Chapter 6.
8+
9+
## Usage
10+
11+
To get started with the code examples, start Airflow with Docker Compose with the following command:
12+
13+
```bash
14+
docker-compose up -d
15+
```
16+
17+
The webserver initializes a few things, so wait for a few seconds, and you should be able to access the
18+
Airflow webserver at http://localhost:8080.
19+
20+
To stop running the examples, run the following command:
21+
22+
```bash
23+
docker-compose down -v
24+
```

chapters/chapter6/dags/couponing_app_figure612.py renamed to chapters/chapter06/dags/figure_6_12.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from airflow.operators.dummy_operator import DummyOperator
77

88
dag = DAG(
9-
dag_id="chapter6_couponing_app_figure612",
9+
dag_id="figure_6_12",
1010
start_date=airflow.utils.dates.days_ago(3),
1111
schedule_interval="0 16 * * *",
1212
description="A batch workflow for ingesting supermarket promotions data.",
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import airflow.utils.dates
2+
from airflow import DAG
3+
from airflow.operators.dagrun_operator import TriggerDagRunOperator
4+
from airflow.operators.dummy_operator import DummyOperator
5+
from airflow.operators.python_operator import PythonOperator
6+
7+
# ================================================ EXAMPLE 1 =================================================
8+
9+
example_1_dag_1 = DAG(
10+
dag_id="figure_6_17_example_1_dag_1",
11+
start_date=airflow.utils.dates.days_ago(3),
12+
schedule_interval="0 0 * * *",
13+
)
14+
example_1_dag_2 = DAG(
15+
dag_id="figure_6_17_example_1_dag_2",
16+
start_date=airflow.utils.dates.days_ago(3),
17+
schedule_interval=None,
18+
)
19+
20+
DummyOperator(task_id="etl", dag=example_1_dag_1) >> TriggerDagRunOperator(
21+
task_id="trigger_dag2",
22+
trigger_dag_id="figure_6_17_example_1_dag_2",
23+
dag=example_1_dag_1,
24+
)
25+
PythonOperator(
26+
task_id="report", dag=example_1_dag_2, python_callable=lambda: print("hello")
27+
)
28+
29+
# ================================================ EXAMPLE 2 =================================================
30+
31+
example_2_dag_1 = DAG(
32+
dag_id="figure_6_17_example_2_dag_1",
33+
start_date=airflow.utils.dates.days_ago(3),
34+
schedule_interval="0 0 * * *",
35+
)
36+
example_2_dag_2 = DAG(
37+
dag_id="figure_6_17_example_2_dag_2",
38+
start_date=airflow.utils.dates.days_ago(3),
39+
schedule_interval="0 0 * * *",
40+
)
41+
example_2_dag_3 = DAG(
42+
dag_id="figure_6_17_example_2_dag_3",
43+
start_date=airflow.utils.dates.days_ago(3),
44+
schedule_interval="0 0 * * *",
45+
)
46+
example_2_dag_4 = DAG(
47+
dag_id="figure_6_17_example_2_dag_4",
48+
start_date=airflow.utils.dates.days_ago(3),
49+
schedule_interval=None,
50+
)
51+
52+
for dag_ in [example_2_dag_1, example_2_dag_2, example_2_dag_3]:
53+
DummyOperator(task_id="etl", dag=dag_) >> TriggerDagRunOperator(
54+
task_id="trigger_dag4", trigger_dag_id="figure_6_17_example_2_dag_4", dag=dag_
55+
)
56+
57+
PythonOperator(
58+
task_id="report", dag=example_2_dag_4, python_callable=lambda: print("hello")
59+
)
60+
61+
# ================================================ EXAMPLE 3 =================================================
62+
63+
example_3_dag_1 = DAG(
64+
dag_id="figure_6_17_example_3_dag_1",
65+
start_date=airflow.utils.dates.days_ago(3),
66+
schedule_interval="0 0 * * *",
67+
)
68+
example_3_dag_2 = DAG(
69+
dag_id="figure_6_17_example_3_dag_2",
70+
start_date=airflow.utils.dates.days_ago(3),
71+
schedule_interval=None,
72+
)
73+
example_3_dag_3 = DAG(
74+
dag_id="figure_6_17_example_3_dag_3",
75+
start_date=airflow.utils.dates.days_ago(3),
76+
schedule_interval=None,
77+
)
78+
example_3_dag_4 = DAG(
79+
dag_id="figure_6_17_example_3_dag_4",
80+
start_date=airflow.utils.dates.days_ago(3),
81+
schedule_interval=None,
82+
)
83+
84+
DummyOperator(task_id="etl", dag=example_3_dag_1) >> [
85+
TriggerDagRunOperator(
86+
task_id="trigger_dag2",
87+
trigger_dag_id="figure_6_17_example_3_dag_2",
88+
dag=example_3_dag_1,
89+
),
90+
TriggerDagRunOperator(
91+
task_id="trigger_dag3",
92+
trigger_dag_id="figure_6_17_example_3_dag_3",
93+
dag=example_3_dag_1,
94+
),
95+
TriggerDagRunOperator(
96+
task_id="trigger_dag4",
97+
trigger_dag_id="figure_6_17_example_3_dag_4",
98+
dag=example_3_dag_1,
99+
),
100+
]
101+
PythonOperator(
102+
task_id="report", dag=example_3_dag_2, python_callable=lambda: print("hello")
103+
)
104+
PythonOperator(
105+
task_id="report", dag=example_3_dag_3, python_callable=lambda: print("hello")
106+
)
107+
PythonOperator(
108+
task_id="report", dag=example_3_dag_4, python_callable=lambda: print("hello")
109+
)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import airflow.utils.dates
2+
from airflow import DAG
3+
from airflow.operators.dummy_operator import DummyOperator
4+
from airflow.operators.python_operator import PythonOperator
5+
from airflow.sensors.external_task_sensor import ExternalTaskSensor
6+
from airflow.utils.state import State
7+
8+
dag1 = DAG(
9+
dag_id="figure_6_19_dag_1",
10+
start_date=airflow.utils.dates.days_ago(3),
11+
schedule_interval="0 0 * * *",
12+
)
13+
dag2 = DAG(
14+
dag_id="figure_6_19_dag_2",
15+
start_date=airflow.utils.dates.days_ago(3),
16+
schedule_interval="0 0 * * *",
17+
)
18+
dag3 = DAG(
19+
dag_id="figure_6_19_dag_3",
20+
start_date=airflow.utils.dates.days_ago(3),
21+
schedule_interval="0 0 * * *",
22+
)
23+
dag4 = DAG(
24+
dag_id="figure_6_19_dag_4",
25+
start_date=airflow.utils.dates.days_ago(3),
26+
schedule_interval=None,
27+
)
28+
29+
DummyOperator(task_id="etl", dag=dag1)
30+
DummyOperator(task_id="etl", dag=dag2)
31+
DummyOperator(task_id="etl", dag=dag3)
32+
[
33+
ExternalTaskSensor(
34+
task_id="wait_for_etl_dag1",
35+
external_dag_id="figure_6_19_dag_1",
36+
external_task_id="etl",
37+
dag=dag4,
38+
allowed_states=State.task_states,
39+
),
40+
ExternalTaskSensor(
41+
task_id="wait_for_etl_dag2",
42+
external_dag_id="figure_6_19_dag_2",
43+
external_task_id="etl",
44+
dag=dag4,
45+
allowed_states=State.task_states,
46+
),
47+
ExternalTaskSensor(
48+
task_id="wait_for_etl_dag3",
49+
external_dag_id="figure_6_19_dag_3",
50+
external_task_id="etl",
51+
dag=dag4,
52+
allowed_states=State.task_states,
53+
),
54+
] >> PythonOperator(task_id="report", dag=dag4, python_callable=lambda: print("hello"))

chapters/chapter6/dags/figure620.py renamed to chapters/chapter06/dags/figure_6_20.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
from airflow.sensors.external_task_sensor import ExternalTaskSensor
77

88
dag1 = DAG(
9-
dag_id="chapter6_ingest_supermarket_data",
9+
dag_id="figure_6_20_dag_1",
1010
start_date=airflow.utils.dates.days_ago(3),
1111
schedule_interval="0 16 * * *",
1212
)
1313
dag2 = DAG(
14-
dag_id="chapter6_create_metrics",
14+
dag_id="figure_6_20_dag_2",
1515
start_date=airflow.utils.dates.days_ago(3),
1616
schedule_interval="0 18 * * *",
1717
)
@@ -22,7 +22,7 @@
2222

2323
wait = ExternalTaskSensor(
2424
task_id="wait_for_process_supermarket",
25-
external_dag_id="chapter6_ingest_supermarket_data",
25+
external_dag_id="figure_6_20_dag_1",
2626
external_task_id="process_supermarket",
2727
execution_delta=datetime.timedelta(hours=6),
2828
dag=dag2,

chapters/chapter6/dags/couponing_app_filesensor.py renamed to chapters/chapter06/dags/figure_6_5.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from airflow.operators.dummy_operator import DummyOperator
55

66
dag = DAG(
7-
dag_id="chapter6_couponing_app_filesensor",
7+
dag_id="figure_6_5",
88
start_date=airflow.utils.dates.days_ago(3),
99
schedule_interval="0 16 * * *",
1010
description="A batch workflow for ingesting supermarket promotions data, demonstrating the FileSensor.",
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from pathlib import Path
2+
3+
import airflow.utils.dates
4+
from airflow import DAG
5+
from airflow.contrib.sensors.python_sensor import PythonSensor
6+
from airflow.operators.dummy_operator import DummyOperator
7+
8+
dag = DAG(
9+
dag_id="figure_6_6",
10+
start_date=airflow.utils.dates.days_ago(3),
11+
schedule_interval="0 16 * * *",
12+
description="A batch workflow for ingesting supermarket promotions data, demonstrating the PythonSensor.",
13+
default_args={"depends_on_past": True},
14+
)
15+
16+
create_metrics = DummyOperator(task_id="create_metrics", dag=dag)
17+
18+
19+
def _wait_for_supermarket(supermarket_id_):
20+
supermarket_path = Path("/data/" + supermarket_id_)
21+
data_files = supermarket_path.glob("data-*.csv")
22+
success_file = supermarket_path / "_SUCCESS"
23+
return data_files and success_file.exists()
24+
25+
26+
for supermarket_id in range(1, 5):
27+
wait = PythonSensor(
28+
task_id=f"wait_for_supermarket_{supermarket_id}",
29+
python_callable=_wait_for_supermarket,
30+
op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"},
31+
timeout=600,
32+
dag=dag,
33+
)
34+
copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag)
35+
process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag)
36+
wait >> copy >> process >> create_metrics

chapters/chapter6/dags/couponing_app_pythonsensor.py renamed to chapters/chapter06/dags/figure_6_9.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
from datetime import datetime
21
from pathlib import Path
32

3+
import airflow.utils.dates
44
from airflow import DAG
55
from airflow.contrib.sensors.python_sensor import PythonSensor
66
from airflow.operators.dummy_operator import DummyOperator
77

88
dag = DAG(
9-
dag_id="chapter6_couponing_app_pythonsensor",
10-
start_date=datetime(2019, 1, 1),
11-
schedule_interval="*/2 * * * *",
12-
concurrency=16,
9+
dag_id="figure_6_9",
10+
start_date=airflow.utils.dates.days_ago(3),
11+
schedule_interval="0 16 * * *",
1312
description="A batch workflow for ingesting supermarket promotions data, demonstrating the PythonSensor.",
13+
default_args={"depends_on_past": True},
1414
)
1515

1616
create_metrics = DummyOperator(task_id="create_metrics", dag=dag)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import airflow.utils.dates
2+
from airflow import DAG
3+
from airflow.contrib.sensors.file_sensor import FileSensor
4+
5+
dag = DAG(
6+
dag_id="listing_6_1",
7+
start_date=airflow.utils.dates.days_ago(3),
8+
schedule_interval="0 16 * * *",
9+
description="A batch workflow for ingesting supermarket promotions data, demonstrating the FileSensor.",
10+
default_args={"depends_on_past": True},
11+
)
12+
13+
wait = FileSensor(
14+
task_id="wait_for_supermarket_1", filepath="/data/supermarket1/data.csv", dag=dag
15+
)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from pathlib import Path
2+
3+
import airflow.utils.dates
4+
from airflow import DAG
5+
from airflow.contrib.sensors.python_sensor import PythonSensor
6+
7+
dag = DAG(
8+
dag_id="listing_6_2",
9+
start_date=airflow.utils.dates.days_ago(3),
10+
schedule_interval="0 16 * * *",
11+
description="A batch workflow for ingesting supermarket promotions data.",
12+
default_args={"depends_on_past": True},
13+
)
14+
15+
16+
def _wait_for_supermarket(supermarket_id_):
17+
supermarket_path = Path("/data/" + supermarket_id_)
18+
data_files = supermarket_path.glob("data-*.csv")
19+
success_file = supermarket_path / "_SUCCESS"
20+
return data_files and success_file.exists()
21+
22+
23+
wait_for_supermarket_1 = PythonSensor(
24+
task_id="wait_for_supermarket_1",
25+
python_callable=_wait_for_supermarket,
26+
op_kwargs={"supermarket_id": "supermarket1"},
27+
dag=dag,
28+
)

0 commit comments

Comments
 (0)