diff --git a/.flake8 b/.flake8 index 598366b2..9e4c003c 100644 --- a/.flake8 +++ b/.flake8 @@ -1,5 +1,5 @@ [flake8] -max-line-length = 110 -ignore = E203 +max-line-length = 88 +ignore = E203,E501 per-file-ignores = chapters/chapter13/gcp/dags/gcp.py:W503 diff --git a/chapters/chapter02/README.md b/chapters/chapter02/README.md new file mode 100644 index 00000000..f8b27eb8 --- /dev/null +++ b/chapters/chapter02/README.md @@ -0,0 +1,25 @@ +# Chapter 2 + +Code accompanying Chapter 2 of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow). + +## Contents + +This folder contains DAGs from Chapter 2. The filenames and DAG ids follow the listing ids in the book. The +final DAG is given in `listing_2_10.py`. + +## Usage + +To get started with the code examples, start Airflow with Docker Compose with the following command: + +```bash +docker-compose up -d +``` + +The webserver initializes a few things, so wait for a few seconds, and you should be able to access the +Airflow webserver at http://localhost:8080. + +To stop running the examples, run the following command: + +```bash +docker-compose down -v +``` diff --git a/chapters/chapter2/dags/chapter2_download_rocket_launches.py b/chapters/chapter02/dags/listing_2_10.py similarity index 59% rename from chapters/chapter2/dags/chapter2_download_rocket_launches.py rename to chapters/chapter02/dags/listing_2_10.py index 2d8b4795..b0c56caa 100644 --- a/chapters/chapter2/dags/chapter2_download_rocket_launches.py +++ b/chapters/chapter02/dags/listing_2_10.py @@ -1,22 +1,22 @@ import json import pathlib -import airflow.utils.dates +import airflow import requests +import requests.exceptions as requests_exceptions from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator dag = DAG( - dag_id="chapter2_download_rocket_launches", - description="Download rocket pictures of recently launched rockets.", + dag_id="listing_2_10", start_date=airflow.utils.dates.days_ago(14), schedule_interval="@daily", ) download_launches = BashOperator( task_id="download_launches", - bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", + bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", # noqa: E501 dag=dag, ) @@ -30,12 +30,17 @@ def _get_pictures(): launches = json.load(f) image_urls = [launch["rocket"]["imageURL"] for launch in launches["launches"]] for image_url in image_urls: - response = requests.get(image_url) - image_filename = image_url.split("/")[-1] - target_file = f"/tmp/images/{image_filename}" - with open(target_file, "wb") as f: - f.write(response.content) - print(f"Downloaded {image_url} to {target_file}") + try: + response = requests.get(image_url) + image_filename = image_url.split("/")[-1] + target_file = f"/tmp/images/{image_filename}" + with open(target_file, "wb") as f: + f.write(response.content) + print(f"Downloaded {image_url} to {target_file}") + except requests_exceptions.MissingSchema: + print(f"{image_url} appears to be an invalid URL.") + except requests_exceptions.ConnectionError: + print(f"Could not connect to {image_url}.") get_pictures = PythonOperator( diff --git a/chapters/chapter02/dags/listing_2_2.py b/chapters/chapter02/dags/listing_2_2.py new file mode 100644 index 00000000..42d6d609 --- /dev/null +++ b/chapters/chapter02/dags/listing_2_2.py @@ -0,0 +1,56 @@ +import json +import pathlib + +import airflow +import requests +import requests.exceptions as requests_exceptions +from airflow import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator + +dag = DAG( + dag_id="listing_2_2", + start_date=airflow.utils.dates.days_ago(14), + schedule_interval=None, +) + +download_launches = BashOperator( + task_id="download_launches", + bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", # noqa: E501 + dag=dag, +) + + +def _get_pictures(): + # Ensure directory exists + pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True) + + # Download all pictures in launches.json + with open("/tmp/launches.json") as f: + launches = json.load(f) + image_urls = [launch["rocket"]["imageURL"] for launch in launches["launches"]] + for image_url in image_urls: + try: + response = requests.get(image_url) + image_filename = image_url.split("/")[-1] + target_file = f"/tmp/images/{image_filename}" + with open(target_file, "wb") as f: + f.write(response.content) + print(f"Downloaded {image_url} to {target_file}") + except requests_exceptions.MissingSchema: + print(f"{image_url} appears to be an invalid URL.") + except requests_exceptions.ConnectionError: + print(f"Could not connect to {image_url}.") + + +get_pictures = PythonOperator( + task_id="get_pictures", python_callable=_get_pictures, dag=dag +) + +notify = BashOperator( + task_id="notify", + bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."', + dag=dag, +) + +download_launches >> get_pictures >> notify diff --git a/chapters/chapter02/dags/listing_2_3.py b/chapters/chapter02/dags/listing_2_3.py new file mode 100644 index 00000000..809b0b94 --- /dev/null +++ b/chapters/chapter02/dags/listing_2_3.py @@ -0,0 +1,8 @@ +import airflow +from airflow import DAG + +dag = DAG( + dag_id="listing_2_3", + start_date=airflow.utils.dates.days_ago(14), + schedule_interval=None, +) diff --git a/chapters/chapter02/dags/listing_2_4.py b/chapters/chapter02/dags/listing_2_4.py new file mode 100644 index 00000000..740954f0 --- /dev/null +++ b/chapters/chapter02/dags/listing_2_4.py @@ -0,0 +1,15 @@ +import airflow +from airflow import DAG +from airflow.operators.bash_operator import BashOperator + +dag = DAG( + dag_id="listing_2_4", + start_date=airflow.utils.dates.days_ago(14), + schedule_interval=None, +) + +download_launches = BashOperator( + task_id="download_launches", + bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", # noqa: E501 + dag=dag, +) diff --git a/chapters/chapter02/dags/listing_2_6.py b/chapters/chapter02/dags/listing_2_6.py new file mode 100644 index 00000000..02bcd7f2 --- /dev/null +++ b/chapters/chapter02/dags/listing_2_6.py @@ -0,0 +1,50 @@ +import json +import pathlib + +import airflow +import requests +import requests.exceptions as requests_exceptions +from airflow import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator + +dag = DAG( + dag_id="listing_2_6", + start_date=airflow.utils.dates.days_ago(14), + schedule_interval=None, +) + +download_launches = BashOperator( + task_id="download_launches", + bash_command="curl -o /tmp/launches.json 'https://launchlibrary.net/1.4/launch?next=5&mode=verbose'", # noqa: E501 + dag=dag, +) + + +def _get_pictures(): + # Ensure directory exists + pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True) + + # Download all pictures in launches.json + with open("/tmp/launches.json") as f: + launches = json.load(f) + image_urls = [launch["rocket"]["imageURL"] for launch in launches["launches"]] + for image_url in image_urls: + try: + response = requests.get(image_url) + image_filename = image_url.split("/")[-1] + target_file = f"/tmp/images/{image_filename}" + with open(target_file, "wb") as f: + f.write(response.content) + print(f"Downloaded {image_url} to {target_file}") + except requests_exceptions.MissingSchema: + print(f"{image_url} appears to be an invalid URL.") + except requests_exceptions.ConnectionError: + print(f"Could not connect to {image_url}.") + + +get_pictures = PythonOperator( + task_id="get_pictures", python_callable=_get_pictures, dag=dag +) + +download_launches >> get_pictures diff --git a/chapters/chapter02/docker-compose.yml b/chapters/chapter02/docker-compose.yml new file mode 100644 index 00000000..50edd6a4 --- /dev/null +++ b/chapters/chapter02/docker-compose.yml @@ -0,0 +1,59 @@ +version: '3.7' + +# ====================================== AIRFLOW ENVIRONMENT VARIABLES ======================================= +x-environment: &airflow_environment + - AIRFLOW__CORE__EXECUTOR=LocalExecutor + - AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False + - AIRFLOW__CORE__LOAD_EXAMPLES=False + - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow + - AIRFLOW__CORE__STORE_DAG_CODE=True + - AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True + - AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True + - AIRFLOW__WEBSERVER__RBAC=False + +x-airflow-image: &airflow_image apache/airflow:1.10.12-python3.8 +# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ====================================== + +services: + postgres: + image: postgres:12-alpine + environment: + - POSTGRES_USER=airflow + - POSTGRES_PASSWORD=airflow + - POSTGRES_DB=airflow + ports: + - "5432:5432" + + init: + image: *airflow_image + depends_on: + - postgres + environment: *airflow_environment + entrypoint: /bin/bash + command: -c 'airflow upgradedb && sleep 5 && airflow create_user --username admin --password admin --firstname John --lastname Smith --role Admin --email admin@example.org' + + webserver: + image: *airflow_image + restart: always + depends_on: + - postgres + ports: + - "8080:8080" + volumes: + - logs:/opt/airflow/logs + environment: *airflow_environment + command: webserver + + scheduler: + image: *airflow_image + restart: always + depends_on: + - postgres + volumes: + - logs:/opt/airflow/logs + - ./dags:/opt/airflow/dags + environment: *airflow_environment + command: scheduler + +volumes: + logs: diff --git a/chapters/chapter2/example/launchlibrary_snippet.json b/chapters/chapter02/example/launchlibrary_snippet.json similarity index 100% rename from chapters/chapter2/example/launchlibrary_snippet.json rename to chapters/chapter02/example/launchlibrary_snippet.json diff --git a/chapters/chapter2/docker-compose.yml b/chapters/chapter2/docker-compose.yml deleted file mode 100644 index 7719b3b6..00000000 --- a/chapters/chapter2/docker-compose.yml +++ /dev/null @@ -1,14 +0,0 @@ -version: "3.7" -services: - airflow: - build: ../../docker - image: manning-airflow:latest - ports: - - "8080:8080" - networks: - - airflow - volumes: - - ./dags:/root/airflow/dags - -networks: - airflow: diff --git a/chapters/chapter2/readme.md b/chapters/chapter2/readme.md deleted file mode 100644 index 28451629..00000000 --- a/chapters/chapter2/readme.md +++ /dev/null @@ -1,21 +0,0 @@ -# Chapter 2 - -Code accompanying Chapter 2 of the book 'Data pipelines with Apache Airflow'. - -## Contents - -This code example contains the following DAGs: - -- chapter2_download_rocket_launches.py - Small DAG for fetching rocket launches. - -## Usage - -To get started with the code examples, start Airflow in docker using the following command: - - docker-compose up -d --build - -Wait for a few seconds and you should be able to access the examples at http://localhost:8080/. - -To stop running the examples, run the following command: - - docker-compose down