Streamlined ETL Process: Unleashing Airflow, BigQuery, Looker, Polars, Soda, DuckDB and YData Profiling
Contact:
https://www.linkedin.com/in/matheusrc/
This ETL (Extract, Transform, Load) project employs several Python libraries, including Polars, Airflow, Soda, YData Profiling, DuckDB, Requests, BeautifulSoup, Loguru, and the Google Cloud Services BigQuery and Looker Studio to streamline the extraction, transformation, and loading of CSV dataset from the Chicago Sidewalk Cafe Permits.
- Extracting CSV data from an API about operating permits for Chicago cafes with the Requests library
- Data transformation using Polars Libraries, DuckDB, and Astro Python SDK
- Uploading local data to BigQuery
- Data visualization with Looker Studio
- Data quality at each stage was done using the Soda library
- Orchestration of the ETL pipeline using Airflow
- Continuous integration with GitHub Actions (CI)
- The project allowed the generation of insights into data such as the largest permit holders, number of permits per year, and number of expired permits
You can check the dataset table at: https://data.cityofchicago.org/widgets/qnjv-hj2q?mobile_redirect=true
- Architecture overview
- Architecture of continuous integration with GitHub Actions
- BigQuery table
- Looker dashboard
- Workflow with Airflow
- Project structure
- Running this project
CREATE OR REPLACE VIEW chicago-cafe-permits.cafe_permits.vw_report
OPTIONS(
description='Report view for Looker Studio dashboard',
labels=[('legal_name', 'total_permits')],
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 60 DAY)
) AS
SELECT
legal_name,
doing_business_as_name,
street_type,
city,
state,
latitude,
longitude,
issued_date,
expiration_date,
payment_date,
site_number,
COUNT(DISTINCT(permit_number)) AS total_permits,
COUNT(site_number) AS total_sites,
ST_GEOGPOINT(latitude, longitude) AS geo,
CASE
WHEN expiration_date > issued_date THEN 'TRUE'
ELSE 'FALSE'
END AS expired
FROM `chicago-cafe-permits.cafe_permits.cafe_permits`
GROUP BY legal_name, doing_business_as_name, street_type, city,state, issued_date, expiration_date, payment_date,site_number, latitude, longitude, expired;
SELECT * FROM `chicago-cafe-permits.cafe_permits.vw_report`;
- I created a concurrency of 1 using the
BashOperator
to avoid two or more executions against DuckDB as allowing two or more calls to DuckDB would cause an error - I loaded the CSV file using an HTTP call by leveraging the Astro Python SDK
astro.sql.load_file
function and the DuckDB connection that I created in AirflowAdmin/Connections
- Then, I create a task to check raw data quality using Soda
- Next, I created a task to generate a data profiling
- Finally, I create a transform task using the Astro Python SDK
astro.sql.dataframe
operator to apply the following transformations: lower column name, remove duplicated rows, remove missing values, and drop a row if all values are null
- After the transformation of data I used Soda to check data quality to ensure that data was transformed as expected
- Next, I created a task to create a data profiling
- Finally, I created a task to create a data profiling comparing the raw data with the transformed data
- I used the
BigQueryCreateEmptyDatasetOperator
operator to create a new empty dataset in BigQuery - Then, I used Astro Python SDK
BigqueryDatabase.load_pandas_dataframe_to_table
function to load data into BigQuery - Finally, I used the Astro Python SDK
astro.sql.cleanup()
function to clean up all tables
As mentioned earlier, Soda can play a pivotal role in ensuring that our data meets all requirements and aligns with our expectations. Here, I will briefly outline the four key data quality features that Soda can effectively address:
├── .devcontainer # VS Code development container
| └── devcontainer.json
├── .github # GitHub Actions for continuous integration (CI)
| └── workflows
| └── main.yml # GitHub Actions configurations
├── Dockerfile
├── LICENSE
├── Makefile # Makefile with some helpful commands
├── README.md
├── airflow_settings.yaml
├── dags
│ ├── __init__.py
│ ├── etl_chicago_cafe_permits_dag.py
│ ├── example_etl.py
│ └── utils
│ ├── __init__.py
│ ├── drop_duplicates.py # Function to remove duplicates
│ ├── drop_full_null_columns.py # Function to drop columns if all values are null
│ ├── drop_full_null_rows.py # Function to drop rows if all values in a row are null
│ ├── drop_missing.py # Function to drop rows with missing values
│ ├── format_url.py # Function to format the URL
│ ├── get_time_period.py # Function to get the current period
│ ├── modify_file_name.py # Function to create a formatted file name
│ └── rename_columns.py # Function to rename DataFrame columns name
├── format.sh # Bash script to format code with ruff
├── include
│ ├── data
│ │ ├── chicago_sidewalk_cafe_permits_2023_11.csv
│ │ └── jobs_nyc_postings_2023_10.csv
│ ├── my_local_ducks.db
│ ├── my_local_ducks.db.wal
│ ├── reports # Directory with reports
│ │ ├── chicago_comparison_2023_11.html
│ │ ├── chicago_raw_profiling_report_2023_11.html
│ │ └── chicago_transformed_profiling_report_2023_11.html
│ └── soda # Directory with SODA files
│ ├── check_function.py # Helpful function for running SODA data quality checks
│ ├── checks # Directory containing data quality rules YML files
│ │ ├── sources
│ │ │ └── raw.yml # Soda data quality check for raw data
│ │ └── transform
│ │ └── transformed.yml # Soda data quality check for transformed data
│ └── configuration.yml # Configurations to connect Soda to a data source (DuckDB)
├── lint.sh # Bash script to format code with ruff
├── notebooks # COLAB notebooks
│ └── gov_etl.ipynb
├── packages.txt
├── plugins
├── requirements.txt
├── setup_data_folders.sh # Bash script to create some directories
├── source_env_linux.sh # Bash script to create a Python virtual environment in linux
├── source_env_windows.sh # Bash script to create a Python virtual environment in windows
├── test.sh # Bash script to test code with pytest
└── tests # Diretory for Python test files
├── __init__.py
├── dags
│ └── test_dag_example.py
└── utils
├── __init__.py
├── test_drop_duplicates.py
├── test_drop_full_null_columns.py
├── test_drop_full_null_rows.py
├── test_drop_missing.py
├── test_format_url.py
├── test_modify_file_name.py
├── test_rename_columns.py
└── test_rename_columns_name.py
- The Astro CLI installed. You can find installation instructions in this link Astro CLI
- Docker Desktop
- Google Cloud account
- Make utility*
- Airflow DuckDB connection (See Creating a connection to DuckDB section below)
- Google Cloud connection (See Creating a connection to Google Cloud section below)
*Optional
First things first, we need to create a Google Cloud Account with a BigQuery Admin Role:
You can find a tutorial at directory docs/google_cloud.md or click here
After finishing the tutorial is time to start the project, you can use one of the following commands on Terminal:
- Run the following command on Terminal
astro dev start
- Use the Makefile command
astro-start
onTerminal
. So that you know, you might need to install the Makefile utility on your machine.
astro-start
Now you can visit the Airflow Webserver at http://localhost:8080 and trigger the ETL workflow or run the Astro command astro dev ps
to see running containers
astro dev ps
Output
Before triggering the Dag you must create the following connections in the Admin
tab.
Next, execute the etl_chicago_cafe_permits
dag