Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[flake8]
max-line-length = 110
ignore = E203
max-line-length = 88
ignore = E203,E501
per-file-ignores =
chapters/chapter13/gcp/dags/gcp.py:W503
25 changes: 25 additions & 0 deletions chapters/chapter02/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Chapter 2

Code accompanying Chapter 2 of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow).

## Contents

This folder contains DAGs from Chapter 2. The filenames and DAG ids follow the listing ids in the book. The
final DAG is given in `listing_2_10.py`.

## Usage

To get started with the code examples, start Airflow with Docker Compose with the following command:

```bash
docker-compose up -d
```

The webserver initializes a few things, so wait for a few seconds, and you should be able to access the
Airflow webserver at http://localhost:8080.

To stop running the examples, run the following command:

```bash
docker-compose down -v
```
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import json
import pathlib

import airflow.utils.dates
import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter2_download_rocket_launches",
description="Download rocket pictures of recently launched rockets.",
dag_id="listing_2_10",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval="@daily",
)

download_launches = BashOperator(
task_id="download_launches",
bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'",
bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", # noqa: E501
dag=dag,
)

Expand All @@ -30,12 +30,17 @@ def _get_pictures():
launches = json.load(f)
image_urls = [launch["rocket"]["imageURL"] for launch in launches["launches"]]
for image_url in image_urls:
response = requests.get(image_url)
image_filename = image_url.split("/")[-1]
target_file = f"/tmp/images/{image_filename}"
with open(target_file, "wb") as f:
f.write(response.content)
print(f"Downloaded {image_url} to {target_file}")
try:
response = requests.get(image_url)
image_filename = image_url.split("/")[-1]
target_file = f"/tmp/images/{image_filename}"
with open(target_file, "wb") as f:
f.write(response.content)
print(f"Downloaded {image_url} to {target_file}")
except requests_exceptions.MissingSchema:
print(f"{image_url} appears to be an invalid URL.")
except requests_exceptions.ConnectionError:
print(f"Could not connect to {image_url}.")


get_pictures = PythonOperator(
Expand Down
56 changes: 56 additions & 0 deletions chapters/chapter02/dags/listing_2_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
import pathlib

import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="listing_2_2",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval=None,
)

download_launches = BashOperator(
task_id="download_launches",
bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", # noqa: E501
dag=dag,
)


def _get_pictures():
# Ensure directory exists
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

# Download all pictures in launches.json
with open("/tmp/launches.json") as f:
launches = json.load(f)
image_urls = [launch["rocket"]["imageURL"] for launch in launches["launches"]]
for image_url in image_urls:
try:
response = requests.get(image_url)
image_filename = image_url.split("/")[-1]
target_file = f"/tmp/images/{image_filename}"
with open(target_file, "wb") as f:
f.write(response.content)
print(f"Downloaded {image_url} to {target_file}")
except requests_exceptions.MissingSchema:
print(f"{image_url} appears to be an invalid URL.")
except requests_exceptions.ConnectionError:
print(f"Could not connect to {image_url}.")


get_pictures = PythonOperator(
task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

notify = BashOperator(
task_id="notify",
bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
dag=dag,
)

download_launches >> get_pictures >> notify
8 changes: 8 additions & 0 deletions chapters/chapter02/dags/listing_2_3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import airflow
from airflow import DAG

dag = DAG(
dag_id="listing_2_3",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval=None,
)
15 changes: 15 additions & 0 deletions chapters/chapter02/dags/listing_2_4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
dag_id="listing_2_4",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval=None,
)

download_launches = BashOperator(
task_id="download_launches",
bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", # noqa: E501
dag=dag,
)
50 changes: 50 additions & 0 deletions chapters/chapter02/dags/listing_2_6.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import json
import pathlib

import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="listing_2_6",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval=None,
)

download_launches = BashOperator(
task_id="download_launches",
bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", # noqa: E501
dag=dag,
)


def _get_pictures():
# Ensure directory exists
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

# Download all pictures in launches.json
with open("/tmp/launches.json") as f:
launches = json.load(f)
image_urls = [launch["rocket"]["imageURL"] for launch in launches["launches"]]
for image_url in image_urls:
try:
response = requests.get(image_url)
image_filename = image_url.split("/")[-1]
target_file = f"/tmp/images/{image_filename}"
with open(target_file, "wb") as f:
f.write(response.content)
print(f"Downloaded {image_url} to {target_file}")
except requests_exceptions.MissingSchema:
print(f"{image_url} appears to be an invalid URL.")
except requests_exceptions.ConnectionError:
print(f"Could not connect to {image_url}.")


get_pictures = PythonOperator(
task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

download_launches >> get_pictures
59 changes: 59 additions & 0 deletions chapters/chapter02/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
version: '3.7'

# ====================================== AIRFLOW ENVIRONMENT VARIABLES =======================================
x-environment: &airflow_environment
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow
- AIRFLOW__CORE__STORE_DAG_CODE=True
- AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
- AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
- AIRFLOW__WEBSERVER__RBAC=False

x-airflow-image: &airflow_image apache/airflow:1.10.12-python3.8
# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ======================================

services:
postgres:
image: postgres:12-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"

init:
image: *airflow_image
depends_on:
- postgres
environment: *airflow_environment
entrypoint: /bin/bash
command: -c 'airflow upgradedb && sleep 5 && airflow create_user --username admin --password admin --firstname John --lastname Smith --role Admin --email [email protected]'

webserver:
image: *airflow_image
restart: always
depends_on:
- postgres
ports:
- "8080:8080"
volumes:
- logs:/opt/airflow/logs
environment: *airflow_environment
command: webserver

scheduler:
image: *airflow_image
restart: always
depends_on:
- postgres
volumes:
- logs:/opt/airflow/logs
- ./dags:/opt/airflow/dags
environment: *airflow_environment
command: scheduler

volumes:
logs:
14 changes: 0 additions & 14 deletions chapters/chapter2/docker-compose.yml

This file was deleted.

21 changes: 0 additions & 21 deletions chapters/chapter2/readme.md

This file was deleted.