diff --git a/airflow-dbt-transformation/Makefile b/airflow-dbt-transformation/Makefile new file mode 100644 index 0000000..3cf1576 --- /dev/null +++ b/airflow-dbt-transformation/Makefile @@ -0,0 +1,25 @@ +usage: ## Shows usage for this Makefile + @cat Makefile | grep -E '^[a-zA-Z_-]+:.*?## .*$$' | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-15s\033[0m %s\n", $$1, $$2}' + +install: ## Install dependencies + which awslocal || pip install 'awscli-local[ver1]' + +init: ## Initialize the Airflow environment in LocalStack MWAA + awslocal s3 mb s3://snowflake-airflow + awslocal mwaa create-environment --dag-s3-path /dags \ + --execution-role-arn arn:aws:iam::000000000000:role/airflow-role \ + --network-configuration {} \ + --source-bucket-arn arn:aws:s3:::snowflake-airflow \ + --airflow-configuration-options agent.code=007,agent.name=bond \ + --name my-mwaa-env \ + --endpoint-url http://localhost.localstack.cloud:4566 + +deploy: ## Deploy the DAG to the local Airflow instance + awslocal s3 cp requirements.txt s3://snowflake-airflow/ + awslocal s3 cp packages.yml s3://snowflake-airflow/dags/ + awslocal s3 cp dbt_project.yml s3://snowflake-airflow/dags/ + awslocal s3 cp --recursive models s3://snowflake-airflow/dags/models + awslocal s3 cp --recursive seeds s3://snowflake-airflow/dags/seeds + awslocal s3 cp airflow_dag.py s3://snowflake-airflow/dags/ + +.PHONY: usage install deploy start diff --git a/airflow-dbt-transformation/README.md b/airflow-dbt-transformation/README.md new file mode 100644 index 0000000..9717931 --- /dev/null +++ b/airflow-dbt-transformation/README.md @@ -0,0 +1,60 @@ +# LocalStack Demo: Data Engineering with Apache Airflow, Snowflake, Snowpark, dbt & Cosmos + +This project illustrates how to use the LocalStack Snowflake+MWAA to run a data transformation pipeline entirely on your local machine. + +The code is based on the Snowflake Guide for [Data Engineering with Apache Airflow, Snowflake, Snowpark, dbt & Cosmos](https://quickstarts.snowflake.com/guide/data_engineering_with_apache_airflow). + +## Prerequisites + +- [`localstack` CLI](https://docs.localstack.cloud/getting-started/installation/#localstack-cli) with [`LOCALSTACK_AUTH_TOKEN`](https://docs.localstack.cloud/getting-started/auth-token/) environment variable set +- [`awslocal` CLI](https://docs.localstack.cloud/user-guide/integrations/aws-cli/#localstack-aws-cli-awslocal) +- [LocalStack Snowflake emulator](https://snowflake.localstack.cloud/getting-started/installation/) + +## Instructions + +### Start LocalStack + +Start the LocalStack Snowflake emulator using the following command: + +```bash +DOCKER_FLAGS='-e SF_LOG=trace' \ + IMAGE_NAME=localstack/snowflake \ + DEBUG=1 \ + localstack start +``` + +### Deploy the app + +The sample application provides Makefile targets to simplify the setup process. + +Run the following command to initialize the Airflow environment in LocalStack (this may take a couple of seconds): +``` +make init +``` + +After deploying the Airflow environment, you should be able to request its details, and extract the webserver URL: +``` +awslocal mwaa get-environment --name my-mwaa-env +... + "Status": "AVAILABLE", + "WebserverUrl": "http://localhost.localstack.cloud:4510" +... +``` + +Now use the following command to deploy the Airflow DAG with our dbt transformation logic locally: +``` +make deploy +``` + +### Use the Airflow UI to trigger a DAG run + +Once the Airflow environment has spun up, and the DAG has been successfully deployed, you should be able to access the Airflow UI under http://localhost.localstack.cloud:4510/home +(Note that the port number may be different - make sure to copy the `WebserverUrl` from the output further above.) + +You can now trigger a DAG run from the UI. If all goes well, the DAG execution result should look something similar to this: + + + +## License + +The code in this project is licensed under the Apache 2.0 License. diff --git a/airflow-dbt-transformation/airflow_dag.py b/airflow-dbt-transformation/airflow_dag.py new file mode 100644 index 0000000..f020b67 --- /dev/null +++ b/airflow-dbt-transformation/airflow_dag.py @@ -0,0 +1,99 @@ +import os +from datetime import datetime +from pathlib import Path + +from airflow import settings +from airflow.models import Connection +from airflow.operators.dummy_operator import DummyOperator +from airflow.decorators import dag, task +from cosmos import DbtTaskGroup, DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig +from cosmos.profiles.snowflake.user_pass import SnowflakeUserPasswordProfileMapping +from snowflake import connector + +dbt_project_path = Path("/opt/airflow/dags") + +# patch Cosmos Snowflake Airflow connector, which currently doesn't support custom host yet :/ +# see https://github.com/astronomer/astronomer-cosmos/blob/9420404ad9b9ad0bb4a4ffb73b50a67e4e1d077c/cosmos/profiles/snowflake/user_pass.py#L35 + +SnowflakeUserPasswordProfileMapping.airflow_param_mapping["host"] = "extra.host" +SnowflakeUserPasswordProfileMapping.airflow_param_mapping["port"] = "extra.port" + +snowflake_connection_params = { + "user": "test", + "password": "test", + "host": "snowflake.localhost.localstack.cloud", + "port": 4566, + "account": "test", + "database": "test", + "schema": "public", +} + + +def create_snowflake_connection(): + conn = Connection( + conn_id="snowflake_local", + conn_type="snowflake", + login="test", + password="test", + description="LocalStack Snowflake", + extra=snowflake_connection_params + ) + session = settings.Session() + conn_name = session.query(Connection).filter(Connection.conn_id == conn.conn_id).first() + + if str(conn_name) == str(conn.conn_id): + return None + + session.add(conn) + session.commit() + return conn + + +create_snowflake_connection() +credentials = SnowflakeUserPasswordProfileMapping( + conn_id="snowflake_local", + profile_args={"database": "test", "schema": "public"}) + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=credentials) + +dbt_executable = f"{os.environ['AIRFLOW_HOME']}/dbt_venv/bin/dbt" + + +@dag(schedule_interval="@hourly", + start_date=datetime(2024, 6, 10), + catchup=False, + dag_id="dbt_snowpark", +) +def dbt_snowpark_dag(): + transform_data = DbtTaskGroup( + group_id="transform_data", + project_config=ProjectConfig(dbt_project_path), + profile_config=profile_config, + execution_config=ExecutionConfig(dbt_executable_path=dbt_executable), + operator_args={"install_deps": True}, + ) + + intermediate = DummyOperator(task_id='intermediate') + + @task + def query_result_data(): + connection = connector.connect(**snowflake_connection_params) + # select rows from `PREPPED_DATA` view created by DBT transformation + result = connection.cursor().execute("SELECT * FROM PREPPED_DATA") + result = list(result) + print("-----") + print(f"Query result ({len(result)} rows):") + for row in result: + print(row) + print("-----") + result = str(result) + return result + + query_result = query_result_data() + transform_data >> intermediate >> query_result + + +dbt_snowpark_dag = dbt_snowpark_dag() diff --git a/airflow-dbt-transformation/dbt_project.yml b/airflow-dbt-transformation/dbt_project.yml new file mode 100644 index 0000000..7b7ea4f --- /dev/null +++ b/airflow-dbt-transformation/dbt_project.yml @@ -0,0 +1,18 @@ +name: test_dbt +models: + my_project: + # Applies to all files under models/example/ + transform: + schema: transform + materialized: view + analysis: + schema: analysis + materialized: view + +seeds: + bookings_1: + enabled: true + bookings_2: + enabled: true + customers: + enabled: true diff --git a/airflow-dbt-transformation/etc/airflow-screenshot.png b/airflow-dbt-transformation/etc/airflow-screenshot.png new file mode 100644 index 0000000..56b3e13 Binary files /dev/null and b/airflow-dbt-transformation/etc/airflow-screenshot.png differ diff --git a/airflow-dbt-transformation/macros/custom_demo_macros.sql b/airflow-dbt-transformation/macros/custom_demo_macros.sql new file mode 100644 index 0000000..3c8bc15 --- /dev/null +++ b/airflow-dbt-transformation/macros/custom_demo_macros.sql @@ -0,0 +1,20 @@ +{% macro generate_schema_name(custom_schema_name, node) -%} + {%- set default_schema = target.schema -%} + {%- if custom_schema_name is none -%} + {{ default_schema }} + {%- else -%} + {{ custom_schema_name | trim }} + {%- endif -%} +{%- endmacro %} + + +{% macro set_query_tag() -%} + {% set new_query_tag = model.name %} {# always use model name #} + {% if new_query_tag %} + {% set original_query_tag = get_current_query_tag() %} + {{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }} + {% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %} + {{ return(original_query_tag)}} + {% endif %} + {{ return(none)}} +{% endmacro %} diff --git a/airflow-dbt-transformation/models/analysis/hotel_count_by_day.sql b/airflow-dbt-transformation/models/analysis/hotel_count_by_day.sql new file mode 100644 index 0000000..147077d --- /dev/null +++ b/airflow-dbt-transformation/models/analysis/hotel_count_by_day.sql @@ -0,0 +1,8 @@ +SELECT + BOOKING_DATE, + HOTEL, + COUNT(ID) as count_bookings +FROM {{ ref('prepped_data') }} +GROUP BY + BOOKING_DATE, + HOTEL \ No newline at end of file diff --git a/airflow-dbt-transformation/models/analysis/thirty_day_avg_cost.sql b/airflow-dbt-transformation/models/analysis/thirty_day_avg_cost.sql new file mode 100644 index 0000000..d08ebbe --- /dev/null +++ b/airflow-dbt-transformation/models/analysis/thirty_day_avg_cost.sql @@ -0,0 +1,11 @@ +SELECT + BOOKING_DATE, + HOTEL, + COST, + AVG(COST) OVER ( + ORDER BY BOOKING_DATE ROWS BETWEEN 29 PRECEDING AND CURRENT ROW + ) as "30_DAY_AVG_COST", + COST - AVG(COST) OVER ( + ORDER BY BOOKING_DATE ROWS BETWEEN 29 PRECEDING AND CURRENT ROW + ) as "DIFF_BTW_ACTUAL_AVG" +FROM {{ ref('prepped_data') }} \ No newline at end of file diff --git a/airflow-dbt-transformation/models/transform/combined_bookings.sql b/airflow-dbt-transformation/models/transform/combined_bookings.sql new file mode 100644 index 0000000..93a3be5 --- /dev/null +++ b/airflow-dbt-transformation/models/transform/combined_bookings.sql @@ -0,0 +1,3 @@ +{{ dbt_utils.union_relations( + relations=[ref('bookings_1'), ref('bookings_2')] +) }} diff --git a/airflow-dbt-transformation/models/transform/customer.sql b/airflow-dbt-transformation/models/transform/customer.sql new file mode 100644 index 0000000..c98d0dc --- /dev/null +++ b/airflow-dbt-transformation/models/transform/customer.sql @@ -0,0 +1,5 @@ +SELECT ID + , FIRST_NAME + , LAST_NAME + , birthdate +FROM {{ ref('customers') }} \ No newline at end of file diff --git a/airflow-dbt-transformation/models/transform/prepped_data.sql b/airflow-dbt-transformation/models/transform/prepped_data.sql new file mode 100644 index 0000000..3516dc8 --- /dev/null +++ b/airflow-dbt-transformation/models/transform/prepped_data.sql @@ -0,0 +1,11 @@ +SELECT A.ID + , FIRST_NAME + , LAST_NAME + , birthdate + , BOOKING_REFERENCE + , HOTEL + , BOOKING_DATE + , COST +FROM {{ref('customer')}} A +JOIN {{ref('combined_bookings')}} B +on A.ID = B.ID diff --git a/airflow-dbt-transformation/packages.yml b/airflow-dbt-transformation/packages.yml new file mode 100644 index 0000000..cd6b686 --- /dev/null +++ b/airflow-dbt-transformation/packages.yml @@ -0,0 +1,3 @@ +packages: + - package: dbt-labs/dbt_utils + version: [">=1.0.0", "<2.0.0"] diff --git a/airflow-dbt-transformation/requirements.txt b/airflow-dbt-transformation/requirements.txt new file mode 100644 index 0000000..b8c42f7 --- /dev/null +++ b/airflow-dbt-transformation/requirements.txt @@ -0,0 +1,4 @@ +apache-airflow-providers-snowflake +astronomer-cosmos +dbt-snowflake +snowflake-connector-python diff --git a/airflow-dbt-transformation/seeds/bookings_1.csv b/airflow-dbt-transformation/seeds/bookings_1.csv new file mode 100644 index 0000000..0eba90d --- /dev/null +++ b/airflow-dbt-transformation/seeds/bookings_1.csv @@ -0,0 +1,8 @@ +id,booking_reference,hotel,booking_date,cost +1,232323231,Pan Pacific,2021-03-19,100 +1,232323232,Fullerton,2021-03-20,200 +1,232323233,Fullerton,2021-04-20,300 +1,232323234,Jackson Square,2021-03-21,400 +1,232323235,Mayflower,2021-06-20,500 +1,232323236,Suncity,2021-03-19,600 +1,232323237,Fullerton,2021-08-20,700 \ No newline at end of file diff --git a/airflow-dbt-transformation/seeds/bookings_2.csv b/airflow-dbt-transformation/seeds/bookings_2.csv new file mode 100644 index 0000000..eb36cfa --- /dev/null +++ b/airflow-dbt-transformation/seeds/bookings_2.csv @@ -0,0 +1,8 @@ +id,booking_reference,hotel,booking_date,cost +2,332323231,Fullerton,2021-03-19,100 +2,332323232,Jackson Square,2021-03-20,300 +2,332323233,Suncity,2021-03-20,300 +2,332323234,Jackson Square,2021-03-21,300 +2,332323235,Fullerton,2021-06-20,300 +2,332323236,Suncity,2021-03-19,300 +2,332323237,Berkly,2021-05-20,200 \ No newline at end of file diff --git a/airflow-dbt-transformation/seeds/customers.csv b/airflow-dbt-transformation/seeds/customers.csv new file mode 100644 index 0000000..8c578a6 --- /dev/null +++ b/airflow-dbt-transformation/seeds/customers.csv @@ -0,0 +1,3 @@ +id,first_name,last_name,birthdate,membership_no +1,jim,jone,1989-03-19,12334 +2,adrian,lee,1990-03-10,12323 \ No newline at end of file