Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions chapters/chapter04/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Chapter 4

Code accompanying Chapter 4 of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow).

## Contents

This folder contains DAGs from Chapter 4. The filenames and DAG ids follow the listing ids in the book. Near
the end of the chapter, we demonstrate usage of the PostgresOperator. The Docker Compose example in this
folder creates a second Postgres database so you don't have to setup things yourself when running the example.
If you like, you can access it:

- Host: `localhost`
- Port: `5433`
- Username: `airflow`
- Password: `airflow`
- Database: `airflow`

This database is initialized with the `pageview_counts` table as shown in the book.

## Usage

To get started with the code examples, start Airflow with Docker Compose with the following command:

```bash
docker-compose up -d
```

The webserver initializes a few things, so wait for a few seconds, and you should be able to access the
Airflow webserver at http://localhost:8080.

To stop running the examples, run the following command:

```bash
docker-compose down -v
```
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from airflow.operators.bash_operator import BashOperator

dag = DAG(
dag_id="chapter4_stocksense_bashoperator",
dag_id="listing_4_1",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@hourly",
)
Expand Down
34 changes: 34 additions & 0 deletions chapters/chapter04/dags/listing_4_13.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from urllib import request

import airflow.utils.dates
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="listing_4_13",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
)


def _get_data(year, month, day, hour, output_path, **_):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)


get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
provide_context=True,
op_kwargs={
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
)
62 changes: 62 additions & 0 deletions chapters/chapter04/dags/listing_4_15.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from urllib import request

import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="listing_4_15",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
max_active_runs=1,
)


def _get_data(year, month, day, hour, output_path, **_):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)


get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
provide_context=True,
op_kwargs={
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
)

extract_gz = BashOperator(
task_id="extract_gz", bash_command="gunzip --force /tmp/wikipageviews.gz", dag=dag
)


def _fetch_pageviews(pagenames):
result = dict.fromkeys(pagenames, 0)
with open("/tmp/wikipageviews", "r") as f:
for line in f:
domain_code, page_title, view_counts, _ = line.split(" ")
if domain_code == "en" and page_title in pagenames:
result[page_title] = view_counts

print(result)
# Prints e.g. "{'Facebook': '778', 'Apple': '20', 'Google': '451', 'Amazon': '9', 'Microsoft': '119'}"


fetch_pageviews = PythonOperator(
task_id="fetch_pageviews",
python_callable=_fetch_pageviews,
op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
dag=dag,
)

get_data >> extract_gz >> fetch_pageviews
Original file line number Diff line number Diff line change
@@ -1,32 +1,24 @@
# """
# Documentation of pageview format: https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/Traffic/Pageviews
# """

import pathlib
from urllib import request

import airflow
import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter4_stocksense",
start_date=airflow.utils.dates.days_ago(30),
dag_id="listing_4_18",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
template_searchpath="/tmp",
max_active_runs=1,
)


def _get_data(year, month, day, hour, ts_nodash, **_):
output_dir = f"/tmp/wikipageviews/{ts_nodash}"
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
def _get_data(year, month, day, hour, output_path, **_):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, f"{output_dir}/wikipageviews.gz")
request.urlretrieve(url, output_path)


get_data = PythonOperator(
Expand All @@ -38,29 +30,26 @@ def _get_data(year, month, day, hour, ts_nodash, **_):
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
depends_on_past=True,
)


extract_gz = BashOperator(
task_id="extract_gz",
bash_command="gunzip --force /tmp/wikipageviews/{{ ts_nodash }}/wikipageviews.gz",
dag=dag,
task_id="extract_gz", bash_command="gunzip --force /tmp/wikipageviews.gz", dag=dag
)


def _fetch_pageviews(pagenames, ts_nodash, execution_date, **_):
def _fetch_pageviews(pagenames, execution_date, **_):
result = dict.fromkeys(pagenames, 0)
with open(f"/tmp/wikipageviews/{ts_nodash}/wikipageviews", "r") as f:
with open("/tmp/wikipageviews", "r") as f:
for line in f:
domain_code, page_title, view_counts, _ = line.split(" ")
if domain_code == "en" and page_title in pagenames:
result[page_title] = view_counts

with open(
f"/tmp/wikipageviews/{ts_nodash}/postgres_query-{ts_nodash}.sql", "w"
) as f:
with open("/tmp/postgres_query.sql", "w") as f:
for pagename, pageviewcount in result.items():
f.write(
"INSERT INTO pageview_counts VALUES ("
Expand All @@ -77,11 +66,4 @@ def _fetch_pageviews(pagenames, ts_nodash, execution_date, **_):
dag=dag,
)

write_to_postgres = PostgresOperator(
task_id="write_to_postgres",
postgres_conn_id="my_postgres",
sql="postgres_query-{{ ts_nodash }}.sql",
dag=dag,
)

get_data >> extract_gz >> fetch_pageviews >> write_to_postgres
get_data >> extract_gz >> fetch_pageviews
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
# """
# Documentation of pageview format: https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/Traffic/Pageviews
# """

from urllib import request

import airflow
import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter4_pythonoperator_args3",
start_date=airflow.utils.dates.days_ago(3),
dag_id="listing_4_20",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
max_active_runs=1,
default_args={"depends_on_past": True},
template_searchpath="/tmp",
max_active_runs=1,
)


Expand All @@ -38,6 +41,7 @@ def _get_data(year, month, day, hour, output_path, **_):
dag=dag,
)


extract_gz = BashOperator(
task_id="extract_gz", bash_command="gunzip --force /tmp/wikipageviews.gz", dag=dag
)
Expand Down
21 changes: 21 additions & 0 deletions chapters/chapter04/dags/listing_4_3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="listing_4_3",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)


def _print_context(**kwargs):
print(kwargs)


print_context = PythonOperator(
task_id="print_context",
python_callable=_print_context,
provide_context=True,
dag=dag,
)
26 changes: 26 additions & 0 deletions chapters/chapter04/dags/listing_4_5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from urllib import request

import airflow.utils.dates
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="listing_4_5",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
)


def _get_data(execution_date, **_):
year, month, day, hour, *_ = execution_date.timetuple()
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
output_path = "/tmp/wikipageviews.gz"
request.urlretrieve(url, output_path)


get_data = PythonOperator(
task_id="get_data", python_callable=_get_data, provide_context=True, dag=dag
)
21 changes: 21 additions & 0 deletions chapters/chapter04/dags/listing_4_7.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="listing_4_7",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)


def _print_context(**context):
print(context)


print_context = PythonOperator(
task_id="print_context",
python_callable=_print_context,
provide_context=True,
dag=dag,
)
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import datetime
from pprint import pprint

import airflow.utils.dates
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
dag_id="chapter4_print_context",
start_date=datetime.datetime(2018, 12, 10),
dag_id="listing_4_7",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)


def _print_context(**context):
pprint(context)
start = context["execution_date"]
end = context["next_execution_date"]
print(f"Start: {start}, end: {end}")


# Prints e.g.:
Expand Down
Loading