Skip to content

Commit 375f55f

Browse files
authored
Merge pull request #25 from BasPH/refactor-ch2
Refactor Chapter 2 after feedback
2 parents 7197942 + 9c3be53 commit 375f55f

File tree

11 files changed

+230
-47
lines changed

11 files changed

+230
-47
lines changed

.flake8

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[flake8]
2-
max-line-length = 110
3-
ignore = E203
2+
max-line-length = 88
3+
ignore = E203,E501
44
per-file-ignores =
55
chapters/chapter13/gcp/dags/gcp.py:W503

chapters/chapter02/README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Chapter 2
2+
3+
Code accompanying Chapter 2 of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow).
4+
5+
## Contents
6+
7+
This folder contains DAGs from Chapter 2. The filenames and DAG ids follow the listing ids in the book. The
8+
final DAG is given in `listing_2_10.py`.
9+
10+
## Usage
11+
12+
To get started with the code examples, start Airflow with Docker Compose with the following command:
13+
14+
```bash
15+
docker-compose up -d
16+
```
17+
18+
The webserver initializes a few things, so wait for a few seconds, and you should be able to access the
19+
Airflow webserver at http://localhost:8080.
20+
21+
To stop running the examples, run the following command:
22+
23+
```bash
24+
docker-compose down -v
25+
```

chapters/chapter2/dags/chapter2_download_rocket_launches.py renamed to chapters/chapter02/dags/listing_2_10.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
import json
22
import pathlib
33

4-
import airflow.utils.dates
4+
import airflow
55
import requests
6+
import requests.exceptions as requests_exceptions
67
from airflow import DAG
78
from airflow.operators.bash_operator import BashOperator
89
from airflow.operators.python_operator import PythonOperator
910

1011
dag = DAG(
11-
dag_id="chapter2_download_rocket_launches",
12-
description="Download rocket pictures of recently launched rockets.",
12+
dag_id="listing_2_10",
1313
start_date=airflow.utils.dates.days_ago(14),
1414
schedule_interval="@daily",
1515
)
1616

1717
download_launches = BashOperator(
1818
task_id="download_launches",
19-
bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'",
19+
bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", # noqa: E501
2020
dag=dag,
2121
)
2222

@@ -30,12 +30,17 @@ def _get_pictures():
3030
launches = json.load(f)
3131
image_urls = [launch["rocket"]["imageURL"] for launch in launches["launches"]]
3232
for image_url in image_urls:
33-
response = requests.get(image_url)
34-
image_filename = image_url.split("/")[-1]
35-
target_file = f"/tmp/images/{image_filename}"
36-
with open(target_file, "wb") as f:
37-
f.write(response.content)
38-
print(f"Downloaded {image_url} to {target_file}")
33+
try:
34+
response = requests.get(image_url)
35+
image_filename = image_url.split("/")[-1]
36+
target_file = f"/tmp/images/{image_filename}"
37+
with open(target_file, "wb") as f:
38+
f.write(response.content)
39+
print(f"Downloaded {image_url} to {target_file}")
40+
except requests_exceptions.MissingSchema:
41+
print(f"{image_url} appears to be an invalid URL.")
42+
except requests_exceptions.ConnectionError:
43+
print(f"Could not connect to {image_url}.")
3944

4045

4146
get_pictures = PythonOperator(
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import json
2+
import pathlib
3+
4+
import airflow
5+
import requests
6+
import requests.exceptions as requests_exceptions
7+
from airflow import DAG
8+
from airflow.operators.bash_operator import BashOperator
9+
from airflow.operators.python_operator import PythonOperator
10+
11+
dag = DAG(
12+
dag_id="listing_2_2",
13+
start_date=airflow.utils.dates.days_ago(14),
14+
schedule_interval=None,
15+
)
16+
17+
download_launches = BashOperator(
18+
task_id="download_launches",
19+
bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", # noqa: E501
20+
dag=dag,
21+
)
22+
23+
24+
def _get_pictures():
25+
# Ensure directory exists
26+
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)
27+
28+
# Download all pictures in launches.json
29+
with open("/tmp/launches.json") as f:
30+
launches = json.load(f)
31+
image_urls = [launch["rocket"]["imageURL"] for launch in launches["launches"]]
32+
for image_url in image_urls:
33+
try:
34+
response = requests.get(image_url)
35+
image_filename = image_url.split("/")[-1]
36+
target_file = f"/tmp/images/{image_filename}"
37+
with open(target_file, "wb") as f:
38+
f.write(response.content)
39+
print(f"Downloaded {image_url} to {target_file}")
40+
except requests_exceptions.MissingSchema:
41+
print(f"{image_url} appears to be an invalid URL.")
42+
except requests_exceptions.ConnectionError:
43+
print(f"Could not connect to {image_url}.")
44+
45+
46+
get_pictures = PythonOperator(
47+
task_id="get_pictures", python_callable=_get_pictures, dag=dag
48+
)
49+
50+
notify = BashOperator(
51+
task_id="notify",
52+
bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
53+
dag=dag,
54+
)
55+
56+
download_launches >> get_pictures >> notify
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import airflow
2+
from airflow import DAG
3+
4+
dag = DAG(
5+
dag_id="listing_2_3",
6+
start_date=airflow.utils.dates.days_ago(14),
7+
schedule_interval=None,
8+
)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import airflow
2+
from airflow import DAG
3+
from airflow.operators.bash_operator import BashOperator
4+
5+
dag = DAG(
6+
dag_id="listing_2_4",
7+
start_date=airflow.utils.dates.days_ago(14),
8+
schedule_interval=None,
9+
)
10+
11+
download_launches = BashOperator(
12+
task_id="download_launches",
13+
bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", # noqa: E501
14+
dag=dag,
15+
)
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import json
2+
import pathlib
3+
4+
import airflow
5+
import requests
6+
import requests.exceptions as requests_exceptions
7+
from airflow import DAG
8+
from airflow.operators.bash_operator import BashOperator
9+
from airflow.operators.python_operator import PythonOperator
10+
11+
dag = DAG(
12+
dag_id="listing_2_6",
13+
start_date=airflow.utils.dates.days_ago(14),
14+
schedule_interval=None,
15+
)
16+
17+
download_launches = BashOperator(
18+
task_id="download_launches",
19+
bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", # noqa: E501
20+
dag=dag,
21+
)
22+
23+
24+
def _get_pictures():
25+
# Ensure directory exists
26+
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)
27+
28+
# Download all pictures in launches.json
29+
with open("/tmp/launches.json") as f:
30+
launches = json.load(f)
31+
image_urls = [launch["rocket"]["imageURL"] for launch in launches["launches"]]
32+
for image_url in image_urls:
33+
try:
34+
response = requests.get(image_url)
35+
image_filename = image_url.split("/")[-1]
36+
target_file = f"/tmp/images/{image_filename}"
37+
with open(target_file, "wb") as f:
38+
f.write(response.content)
39+
print(f"Downloaded {image_url} to {target_file}")
40+
except requests_exceptions.MissingSchema:
41+
print(f"{image_url} appears to be an invalid URL.")
42+
except requests_exceptions.ConnectionError:
43+
print(f"Could not connect to {image_url}.")
44+
45+
46+
get_pictures = PythonOperator(
47+
task_id="get_pictures", python_callable=_get_pictures, dag=dag
48+
)
49+
50+
download_launches >> get_pictures
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
version: '3.7'
2+
3+
# ====================================== AIRFLOW ENVIRONMENT VARIABLES =======================================
4+
x-environment: &airflow_environment
5+
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
6+
- AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
7+
- AIRFLOW__CORE__LOAD_EXAMPLES=False
8+
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow
9+
- AIRFLOW__CORE__STORE_DAG_CODE=True
10+
- AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
11+
- AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
12+
- AIRFLOW__WEBSERVER__RBAC=False
13+
14+
x-airflow-image: &airflow_image apache/airflow:1.10.12-python3.8
15+
# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ======================================
16+
17+
services:
18+
postgres:
19+
image: postgres:12-alpine
20+
environment:
21+
- POSTGRES_USER=airflow
22+
- POSTGRES_PASSWORD=airflow
23+
- POSTGRES_DB=airflow
24+
ports:
25+
- "5432:5432"
26+
27+
init:
28+
image: *airflow_image
29+
depends_on:
30+
- postgres
31+
environment: *airflow_environment
32+
entrypoint: /bin/bash
33+
command: -c 'airflow upgradedb && sleep 5 && airflow create_user --username admin --password admin --firstname John --lastname Smith --role Admin --email [email protected]'
34+
35+
webserver:
36+
image: *airflow_image
37+
restart: always
38+
depends_on:
39+
- postgres
40+
ports:
41+
- "8080:8080"
42+
volumes:
43+
- logs:/opt/airflow/logs
44+
environment: *airflow_environment
45+
command: webserver
46+
47+
scheduler:
48+
image: *airflow_image
49+
restart: always
50+
depends_on:
51+
- postgres
52+
volumes:
53+
- logs:/opt/airflow/logs
54+
- ./dags:/opt/airflow/dags
55+
environment: *airflow_environment
56+
command: scheduler
57+
58+
volumes:
59+
logs:

chapters/chapter2/docker-compose.yml

Lines changed: 0 additions & 14 deletions
This file was deleted.

0 commit comments

Comments
 (0)