Skip to content

Commit 7197942

Browse files
Julian de RuiterJulian de Ruiter
andauthored
Update chapter 7 for final review (becomes chapter 8). (#20)
* Updated chapter 7, renamed to chapter 8. * Rename src to avoid confusion. * Update compose to use official Airflow image. * Fix linting issues. Co-authored-by: Julian de Ruiter <[email protected]>
1 parent 01e64c0 commit 7197942

23 files changed

+166
-98
lines changed

chapters/chapter03/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,6 @@ services:
5050
command: scheduler
5151
events_api:
5252
build: ./api
53-
image: manning-airflow-events-api:latest
53+
image: manning-airflow/events-api:latest
5454
ports:
5555
- "5000:5000"

chapters/chapter08/api/Dockerfile

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
FROM python:3.8-slim
2+
3+
COPY requirements.txt /tmp/requirements.txt
4+
RUN pip install -r /tmp/requirements.txt && rm -f /tmp/requirements.txt
5+
6+
COPY app.py fetch_ratings.py /
7+
RUN python /fetch_ratings.py --output_path /ratings.csv
8+
9+
EXPOSE 5000
10+
11+
ENTRYPOINT ["python"]
12+
CMD ["/app.py"]

chapters/chapter7/api/app.py renamed to chapters/chapter08/api/app.py

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
import datetime as dt
21
import os
32
import time
43

5-
import numpy as np
64
import pandas as pd
75

86
from flask import Flask, jsonify, request
@@ -12,43 +10,20 @@
1210
DEFAULT_ITEMS_PER_PAGE = 100
1311

1412

15-
def _read_ratings(file_path, shift_ts=True):
13+
def _read_ratings(file_path):
1614
ratings = pd.read_csv(file_path)
1715

1816
# Subsample dataset.
19-
ratings = ratings.sample(n=100000)
17+
ratings = ratings.sample(n=100000, random_state=0)
2018

2119
# Sort by ts, user, movie for convenience.
2220
ratings = ratings.sort_values(by=["timestamp", "userId", "movieId"])
2321

24-
# Replace timestamps with timestamps from
25-
# within the last month.
26-
if shift_ts:
27-
today = dt.datetime.now().date()
28-
29-
ratings["timestamp"] = _random_timestamps(
30-
start_date=today + dt.timedelta(days=-30),
31-
end_date=today,
32-
size=ratings.shape[0],
33-
)
34-
3522
return ratings
3623

3724

38-
def _random_timestamps(start_date=None, end_date=None, size=100):
39-
"""Generates random timestamps (in seconds) between given start/end dates."""
40-
41-
def _date_to_datetime(date):
42-
return dt.datetime.combine(date, dt.datetime.min.time())
43-
44-
start_ts = int(_date_to_datetime(start_date).timestamp())
45-
end_ts = int(_date_to_datetime(end_date).timestamp())
46-
47-
return np.random.randint(low=start_ts, high=end_ts, size=size)
48-
49-
5025
app = Flask(__name__)
51-
app.config["ratings"] = _read_ratings("/ratings.csv", shift_ts=True)
26+
app.config["ratings"] = _read_ratings("/ratings.csv")
5227

5328
auth = HTTPBasicAuth()
5429
users = {os.environ["API_USER"]: generate_password_hash(os.environ["API_PASSWORD"])}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import logging
2+
from pathlib import Path
3+
import tempfile
4+
from urllib.request import urlretrieve
5+
import zipfile
6+
7+
import click
8+
import pandas as pd
9+
10+
logging.basicConfig(
11+
format="[%(asctime)-15s] %(levelname)s - %(message)s", level=logging.INFO
12+
)
13+
14+
15+
@click.command()
16+
@click.option("--start_date", default="2019-01-01", type=click.DateTime())
17+
@click.option("--end_date", default="2020-01-01", type=click.DateTime())
18+
@click.option("--output_path", required=True)
19+
def main(start_date, end_date, output_path):
20+
"""Script for fetching movielens ratings within a given date range."""
21+
22+
logging.info("Fetching ratings...")
23+
ratings = fetch_ratings()
24+
25+
# Subset to expected range.
26+
logging.info(f"Filtering for dates {start_date} - {end_date}...")
27+
ts_parsed = pd.to_datetime(ratings["timestamp"], unit="s")
28+
ratings = ratings.loc[(ts_parsed >= start_date) & (ts_parsed < end_date)]
29+
30+
logging.info(f"Writing ratings to '{output_path}'...")
31+
ratings.to_csv(output_path, index=False)
32+
33+
34+
def fetch_ratings():
35+
"""Fetches ratings from the given URL."""
36+
37+
url = "http://files.grouplens.org/datasets/movielens/ml-25m.zip"
38+
39+
with tempfile.TemporaryDirectory() as tmp_dir:
40+
tmp_path = Path(tmp_dir, "download.zip")
41+
logging.info(f"Downloading zip file from {url}")
42+
urlretrieve(url, tmp_path)
43+
44+
with zipfile.ZipFile(tmp_path) as zip_:
45+
logging.info(f"Downloaded zip file with contents: {zip_.namelist()}")
46+
47+
logging.info("Reading ml-25m/ratings.csv from zip file")
48+
with zip_.open("ml-25m/ratings.csv") as file_:
49+
ratings = pd.read_csv(file_)
50+
51+
return ratings
52+
53+
54+
if __name__ == "__main__":
55+
main()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
Flask==1.1.1
22
pandas==0.25.2
33
Flask-HTTPAuth==3.3.0
4+
click

chapters/chapter7/dags/movielens_python_operator.py renamed to chapters/chapter08/dags/01_movielens_python_operator.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
import datetime as dt
12
import logging
23
import json
34
import os
45

56
import pandas as pd
67
import requests
78

8-
from airflow import DAG, utils as airflow_utils
9+
from airflow import DAG
910
from airflow.operators.python_operator import PythonOperator
1011

1112
from custom.ranking import rank_movies_by_rating
@@ -71,9 +72,10 @@ def _get_with_pagination(session, url, params, batch_size=100):
7172

7273

7374
with DAG(
74-
dag_id="chapter7_movielens_python_operator",
75+
dag_id="01_movielens_python_operator",
7576
description="Fetches ratings from the Movielens API using the Python Operator.",
76-
start_date=airflow_utils.dates.days_ago(7),
77+
start_date=dt.datetime(2019, 1, 1),
78+
end_date=dt.datetime(2019, 1, 10),
7779
schedule_interval="@daily",
7880
) as dag:
7981

@@ -117,8 +119,12 @@ def _rank_movies(templates_dict, min_ratings=2, **_):
117119
output_path = templates_dict["output_path"]
118120

119121
ratings = pd.read_json(input_path)
120-
121122
ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings)
123+
124+
# Make sure output directory exists.
125+
output_dir = os.path.dirname(output_path)
126+
os.makedirs(output_dir, exist_ok=True)
127+
122128
ranking.to_csv(output_path, index=True)
123129

124130
rank_movies = PythonOperator(

chapters/chapter7/dags/movielens_custom_hook.py renamed to chapters/chapter08/dags/02_movielens_custom_hook.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
1+
import datetime as dt
12
import logging
23
import json
34
import os
45

5-
from airflow import DAG, utils as airflow_utils
6+
from airflow import DAG
67
from airflow.operators.python_operator import PythonOperator
78

89
from custom.hooks import MovielensHook
910

1011

1112
with DAG(
12-
dag_id="chapter7_movielens_custom_hook",
13+
dag_id="02_movielens_custom_hook",
1314
description="Fetches ratings from the Movielens API using a custom hook.",
14-
start_date=airflow_utils.dates.days_ago(7),
15+
start_date=dt.datetime(2019, 1, 1),
16+
end_date=dt.datetime(2019, 1, 10),
1517
schedule_interval="@daily",
1618
) as dag:
1719

chapters/chapter7/dags/movielens_custom_operator.py renamed to chapters/chapter08/dags/03_movielens_custom_operator.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
from airflow import DAG, utils as airflow_utils
1+
import datetime as dt
2+
3+
from airflow import DAG
24

35
from custom.operators import MovielensFetchRatingsOperator
46

57

68
with DAG(
7-
dag_id="chapter7_movielens_custom_operator",
9+
dag_id="03_movielens_custom_operator",
810
description="Fetches ratings from the Movielens API using a custom operator.",
9-
start_date=airflow_utils.dates.days_ago(7),
11+
start_date=dt.datetime(2019, 1, 1),
12+
end_date=dt.datetime(2019, 1, 10),
1013
schedule_interval="@daily",
1114
) as dag:
1215
MovielensFetchRatingsOperator(

chapters/chapter7/dags/movielens_custom_sensor.py renamed to chapters/chapter08/dags/04_movielens_custom_sensor.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
from airflow import DAG, utils as airflow_utils
1+
import datetime as dt
2+
3+
from airflow import DAG
24

35
from custom.operators import MovielensFetchRatingsOperator
46
from custom.sensors import MovielensRatingsSensor
57

68
with DAG(
7-
dag_id="chapter7_movielens_sensor",
9+
dag_id="04_movielens_sensor",
810
description="Fetches ratings from the Movielens API, with a custom sensor.",
9-
start_date=airflow_utils.dates.days_ago(7),
11+
start_date=dt.datetime(2019, 1, 1),
12+
end_date=dt.datetime(2019, 1, 10),
1013
schedule_interval="@daily",
1114
) as dag:
1215
wait_for_ratings = MovielensRatingsSensor(

0 commit comments

Comments
 (0)