Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ dmypy.json

# Data folders
data
logs

# Pyenv
.python-version
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Overall, this repository is structured as follows:
├── README.md # This readme.
├── chapters # Code examples for each of the Chapters.
├── docker # Supporting Docker image (containing Airflow).
└── environment.yml
└── requirements.txt
```

The most interesting parts are probably the *docker* directory and the *chapter* directories under *chapters*.
Expand Down
3 changes: 3 additions & 0 deletions chapters/chapter01/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ services:
- postgres
ports:
- "8080:8080"
volumes:
- ./logs:/opt/airflow/logs
environment: *airflow_environment
command: webserver
scheduler:
Expand All @@ -42,5 +44,6 @@ services:
- postgres
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
environment: *airflow_environment
command: scheduler
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter3_01_unscheduled",
start_date=datetime(2015, 6, 1),
schedule_interval=None,
dag_id="01_unscheduled", start_date=datetime(2015, 6, 1), schedule_interval=None
)

fetch_events = BashOperator(
task_id="fetch_events",
bash_command=(
"mkdir -p /data/events && "
"curl -o /data/events.json https://events_api:5000/events"
"curl -o /data/events.json http://events_api:5000/events"
),
dag=dag,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter3_02_daily_schedule",
dag_id="02_daily_schedule",
start_date=datetime(2019, 1, 1),
end_date=datetime(2019, 1, 5),
schedule_interval="@daily",
)

fetch_events = BashOperator(
task_id="fetch_events",
bash_command=(
"mkdir -p /data/events && "
"curl -o /data/events.json https://events_api:5000/events"
"curl -o /data/events.json http://events_api:5000/events"
),
dag=dag,
)
Expand All @@ -35,7 +36,7 @@ def _calculate_stats(input_path, output_path):
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={"input_path": "data/events.json", "output_path": "data/stats.csv"},
op_kwargs={"input_path": "/data/events.json", "output_path": "/data/stats.csv"},
dag=dag,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter3_03_different_start_date",
dag_id="03_different_start_date",
schedule_interval="@daily",
start_date=dt.datetime(year=2019, month=1, day=1),
)
Expand All @@ -16,7 +16,7 @@
task_id="fetch_events",
bash_command=(
"mkdir -p /data/events && "
"curl -o /data/events.json https://events_api:5000/events"
"curl -o /data/events.json http://events_api:5000/events"
),
dag=dag,
)
Expand All @@ -35,7 +35,7 @@ def _calculate_stats(input_path, output_path):
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={"input_path": "data/events.json", "output_path": "data/stats.csv"},
op_kwargs={"input_path": "/data/events.json", "output_path": "/data/stats.csv"},
dag=dag,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter3_04_with_end_date",
dag_id="04_with_end_date",
schedule_interval="@daily",
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5),
Expand All @@ -17,7 +17,7 @@
task_id="fetch_events",
bash_command=(
"mkdir -p /data/events && "
"curl -o /data/events.json https://events_api:5000/events"
"curl -o /data/events.json http://events_api:5000/events"
),
dag=dag,
)
Expand All @@ -36,7 +36,7 @@ def _calculate_stats(input_path, output_path):
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={"input_path": "data/events.json", "output_path": "data/stats.csv"},
op_kwargs={"input_path": "/data/events.json", "output_path": "/data/stats.csv"},
dag=dag,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter3_05_time_delta_schedule",
dag_id="05_time_delta_schedule",
schedule_interval=timedelta(days=3),
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5),
Expand All @@ -18,7 +18,7 @@
task_id="fetch_events",
bash_command=(
"mkdir -p /data/events && "
"curl -o /data/events.json https://events_api:5000/events"
"curl -o /data/events.json http://events_api:5000/events"
),
dag=dag,
)
Expand All @@ -37,7 +37,7 @@ def _calculate_stats(input_path, output_path):
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={"input_path": "data/events.json", "output_path": "data/stats.csv"},
op_kwargs={"input_path": "/data/events.json", "output_path": "/data/stats.csv"},
dag=dag,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter3_06_query_with_dates",
dag_id="06_query_with_dates",
schedule_interval=timedelta(days=3),
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5),
Expand Down Expand Up @@ -40,7 +40,7 @@ def _calculate_stats(input_path, output_path):
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={"input_path": "data/events.json", "output_path": "data/stats.csv"},
op_kwargs={"input_path": "/data/events.json", "output_path": "/data/stats.csv"},
dag=dag,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter3_07_templated_query",
dag_id="07_templated_query",
schedule_interval=timedelta(days=3),
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5),
Expand Down Expand Up @@ -40,7 +40,7 @@ def _calculate_stats(input_path, output_path):
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={"input_path": "data/events.json", "output_path": "data/stats.csv"},
op_kwargs={"input_path": "/data/events.json", "output_path": "/data/stats.csv"},
dag=dag,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter3_08_templated_query_ds",
dag_id="08_templated_query_ds",
schedule_interval=timedelta(days=3),
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5),
Expand Down Expand Up @@ -40,7 +40,7 @@ def _calculate_stats(input_path, output_path):
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs={"input_path": "data/events.json", "output_path": "data/stats.csv"},
op_kwargs={"input_path": "/data/events.json", "output_path": "/data/stats.csv"},
dag=dag,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter3_09_templated_path",
dag_id="09_templated_path",
schedule_interval=timedelta(days=3),
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5),
Expand Down Expand Up @@ -43,8 +43,8 @@ def _calculate_stats(**context):
task_id="calculate_stats",
python_callable=_calculate_stats,
templates_dict={
"input_path": "data/events/{{ds}}.json",
"output_path": "data/stats/{{ds}}.csv",
"input_path": "/data/events/{{ds}}.json",
"output_path": "/data/stats/{{ds}}.csv",
},
provide_context=True,
dag=dag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter3_10_full_example",
dag_id="10_full_example",
schedule_interval=timedelta(days=3),
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5),
Expand Down
55 changes: 55 additions & 0 deletions chapters/chapter03/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
version: '3.7'
# ====================================== AIRFLOW ENVIRONMENT VARIABLES =======================================
x-environment: &airflow_environment
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow
- AIRFLOW__CORE__STORE_DAG_CODE=True
- AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
- AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
- AIRFLOW__WEBSERVER__RBAC=False
x-airflow-image: &airflow_image apache/airflow:1.10.12-python3.8
# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ======================================
services:
postgres:
image: postgres:12-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
init:
image: *airflow_image
depends_on:
- postgres
environment: *airflow_environment
command: upgradedb
webserver:
image: *airflow_image
restart: always
depends_on:
- postgres
ports:
- "8080:8080"
volumes:
- ./logs:/opt/airflow/logs
environment: *airflow_environment
command: webserver
scheduler:
image: *airflow_image
restart: always
depends_on:
- postgres
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./data:/data
environment: *airflow_environment
command: scheduler
events_api:
build: ./api
image: manning-airflow-events-api:latest
ports:
- "5000:5000"
30 changes: 30 additions & 0 deletions chapters/chapter03/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Chapter 3

Code accompanying Chapter 3 of the book 'Data pipelines with Apache Airflow'.

## Contents

This code example contains the following DAGs:

- 01_unscheduled.py - Initial DAG without schedule.
- 02_daily_schedule.py - Same DAG following a daily schedule.
- 03_different_start_date.py - DAG with adjusted start date.
- 04_with_end_date.py - Modified DAG with an end date.
- 05_time_delta_schedule.py - DAG that uses a timedelta for the schedule interval.
- 06_query_with_dates.py - DAG including hard-coded dates in the query.
- 07_templated_query.py - Replaces hard-coded dates with templated execution dates.
- 08_templated_query_ds.py - Uses shorthands for the templated execution dates.
- 09_templated_path.py - Uses templating for the file paths as well.
- 10_full_example.py - Filly completed example, including 'sending' of statistics.

## Usage

To get started with the code examples, start Airflow in docker using the following command:

docker-compose up -d --build

Wait for a few seconds and you should be able to access the examples at http://localhost:8080/.

To stop running the examples, run the following command:

docker-compose down
22 changes: 0 additions & 22 deletions chapters/chapter3/docker-compose.yml

This file was deleted.

30 changes: 0 additions & 30 deletions chapters/chapter3/readme.md

This file was deleted.