Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
__pycache__
benchmark/pre-process/key.json
dbt/logs
dbt/altered_jaffle_shop/.user.yml
8 changes: 7 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ RUN pip install "dbt-core<1.10"
RUN pip install dbt-bigquery
RUN pip install dbt-postgres
RUN pip install dbt-snowflake
RUN pydantic==2.11.0
RUN pip install pydantic==2.11.0

#ENV OPENLINEAGE_DISABLED=True
ENV AIRFLOW__CORE__LOAD_EXAMPLES=False
ENV AIRFLOW__CORE__TEST_CONNECTION=Enabled
# In async DAG getting error
# sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush.
# To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: Can't flush None value found in collection DatasetModel.aliases (Background on this error at: https://sqlalche.me/e/14/7s2a)
ENV AIRFLOW__COSMOS__ENABLE_DATASET_ALIAS=False

USER astro

RUN airflow db migrate
Comment thread
pankajastro marked this conversation as resolved.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ This project contains the following files and folders:
* 2 seeds
* 52 sources
* 185 models
* [altered_jaffle_shop](https://github.com/astronomer/cosmos-benchmark/tree/main/dbt/altered_jaffle_shop): A custom dbt project designed to pre-compile dbt project and focus on specific custom models without any dbt-generated metadata dependencies. This project is based on a copy of the jaffle-shop project, with an additional four models that require significant time to run, enabling measurement of async DAG performance.
* The project includes one addition seed file (model_params.csv) — you can increase model transformation time by updating the values in this seed.
* More models can be generated automatically by running the bash command: `./benchmark/auto_generate_models.sh 2` (Here, input 2 means generate 2 additional models for each custom model, i.e., generate 8 additional models).
- **Dockerfile**: This file contains a versioned Astro Runtime Docker image that provides a differentiated Airflow experience.
- **include**: This folder contains any additional files that you want to include as part of your project. In this particular case, it contains configuration files.
- **packages.txt**: Install OS-level packages needed for your project by adding them to this file. It is empty by default.
Expand Down
1 change: 1 addition & 0 deletions benchmark/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ WORKDIR /app

# Copy project files into the container
COPY dbt/fhir-dbt-analytics /app
COPY dbt/altered_jaffle_shop /app
COPY benchmark/pre-process/profiles.yml /app/profiles.yml
COPY benchmark/pre-process/key.json /app/key.json

Expand Down
17 changes: 17 additions & 0 deletions benchmark/auto_generate_models.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env bash
set -euo pipefail

: "${1:?Usage: $0 <number_of_copies>}"

slow_models=(
Comment thread
pankajkoti marked this conversation as resolved.
"dbt/altered_jaffle_shop/models/customers_slow_query.sql"
Comment thread
pankajkoti marked this conversation as resolved.
"dbt/altered_jaffle_shop/models/long_model_cross_random.sql"
"dbt/altered_jaffle_shop/models/long_model_subquery_windows.sql"
"dbt/altered_jaffle_shop/models/long_model_text_processing.sql"
)

for file in "${slow_models[@]}"; do
for ((i = 1; i <= $1; i++)); do
cp -n "$file" "${file%.sql}${i}.sql"
done
done
28 changes: 28 additions & 0 deletions benchmark/experiment/airflow-test-cosmos-async.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: batch/v1
kind: Job
metadata:
name: airflow-test-cosmos-async
spec:
template:
spec:
containers:
- name: airflow
image: cosmos-benchmark:0.0.3
imagePullPolicy: Never
command: ["airflow", "dags", "test", "cosmos_bq_async"]
env:
- name: AIRFLOW_CONN_GCP_GS_CONN
valueFrom:
secretKeyRef:
name: gcp-credentials
key: airflow-conn
resources:
# Equivalent to Astro's A10 instance
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "2"
memory: "4Gi"
restartPolicy: Never
backoffLimit: 0
40 changes: 40 additions & 0 deletions dags/cosmos_async_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig
from cosmos.profiles import GoogleCloudServiceAccountDictProfileMapping
from include.constants import BIGQUERY_DATASET, DBT_ADAPTER_VERSION, GCP_PROJECT_ID

DBT_PROJECT_PATH = Path("/usr/local/airflow/dbt/altered_jaffle_shop")



profile_config = ProfileConfig(
profile_name="altered_jaffle_shop",
target_name="dev",
profile_mapping=GoogleCloudServiceAccountDictProfileMapping(
conn_id="gcp_gs_conn", profile_args={"dataset": BIGQUERY_DATASET, "project": GCP_PROJECT_ID}
),
)
Comment thread
pankajastro marked this conversation as resolved.


cosmos_bq_async = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(DBT_PROJECT_PATH),
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.AIRFLOW_ASYNC,
async_py_requirements=[f"dbt-bigquery=={DBT_ADAPTER_VERSION}"],
),
# normal dag parameters
schedule=None,
start_date=datetime(2026, 1, 1),
catchup=False,
dag_id="cosmos_bq_async",
tags=["simple"],
operator_args={
"location": "US",
"install_deps": True,
"full_refresh": True,
},
)
4 changes: 4 additions & 0 deletions dbt/altered_jaffle_shop/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

target/
dbt_packages/
logs/
15 changes: 15 additions & 0 deletions dbt/altered_jaffle_shop/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Welcome to your new dbt project!

### Using the starter project

Try running the following commands:
- dbt run
- dbt test


### Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
Empty file.
22 changes: 22 additions & 0 deletions dbt/altered_jaffle_shop/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'altered_jaffle_shop'
version: '1.0.0'

# This setting configures which "profile" dbt uses for this project.
profile: 'altered_jaffle_shop'

# These configurations specify where dbt should look for different types of files.
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"
Empty file.
71 changes: 71 additions & 0 deletions dbt/altered_jaffle_shop/models/customers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
{{ config(tags=["customers"]) }}

with customers as (

select * from {{ ref('stg_customers') }}

),

orders as (

select * from {{ ref('stg_orders') }}

),

payments as (

select * from {{ ref('stg_payments') }}

),

customer_orders as (

select
customer_id,

min(order_date) as first_order,
max(order_date) as most_recent_order,
count(order_id) as number_of_orders
from orders

group by customer_id

),

customer_payments as (

select
orders.customer_id,
sum(amount) as total_amount

from payments

left join orders on
payments.order_id = orders.order_id

group by orders.customer_id

),

final as (

select
customers.customer_id,
customers.first_name,
customers.last_name,
customer_orders.first_order,
customer_orders.most_recent_order,
customer_orders.number_of_orders,
customer_payments.total_amount as total_order_amount

from customers

left join customer_orders
on customers.customer_id = customer_orders.customer_id

left join customer_payments
on customers.customer_id = customer_payments.customer_id

)

select * from final
37 changes: 37 additions & 0 deletions dbt/altered_jaffle_shop/models/customers_slow_query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{{ config(
materialized = "table"
) }}

WITH params AS (
SELECT array_x, array_y
FROM {{ ref('model_params') }}
WHERE model_name = 'customers_slow_query'
LIMIT 1
),
base AS (
SELECT * FROM {{ ref('customers') }}
),
expanded AS (
SELECT
b.*,
x AS extra_x,
y AS extra_y
FROM base b
CROSS JOIN params p
CROSS JOIN UNNEST(GENERATE_ARRAY(1, p.array_x)) AS x
CROSS JOIN UNNEST(GENERATE_ARRAY(1, p.array_y)) AS y
),
windowed AS (
SELECT
*,
AVG(LENGTH(CAST(customer_id AS STRING))) OVER (
PARTITION BY customer_id
ORDER BY extra_x, extra_y
) AS avg_len
FROM expanded
)
SELECT
customer_id,
SUM(avg_len) AS sum_len
FROM windowed
GROUP BY customer_id
14 changes: 14 additions & 0 deletions dbt/altered_jaffle_shop/models/docs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% docs orders_status %}

Orders can be one of the following statuses:

| status | description |
|----------------|------------------------------------------------------------------------------------------------------------------------|
| placed | The order has been placed but has not yet left the warehouse |
| shipped | The order has been shipped to the customer and is currently in transit |
| completed | The order has been received by the customer |
| return_pending | The customer has indicated that they would like to return the order, but it has not yet been received at the warehouse |
| returned | The order has been returned by the customer and received at the warehouse |


{% enddocs %}
30 changes: 30 additions & 0 deletions dbt/altered_jaffle_shop/models/long_model_cross_random.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{{ config(
materialized = "table"
) }}

WITH params AS (
SELECT array_x, array_y
FROM {{ ref('model_params') }}
WHERE model_name = 'long_model_cross_random'
LIMIT 1
),
base AS (
SELECT * FROM {{ ref('customers') }}
),
inflated AS (
SELECT
b.customer_id,
x AS x_val,
y AS y_val,
RAND() * x * y AS random_val
FROM base b
CROSS JOIN params p
CROSS JOIN UNNEST(GENERATE_ARRAY(1, p.array_x)) AS x
CROSS JOIN UNNEST(GENERATE_ARRAY(1, p.array_y)) AS y
)
SELECT
customer_id,
COUNT(*) AS row_count,
AVG(random_val) AS avg_val
FROM inflated
GROUP BY customer_id
55 changes: 55 additions & 0 deletions dbt/altered_jaffle_shop/models/long_model_subquery_windows.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{{ config(
materialized = "table"
) }}

WITH params AS (
SELECT array_x
FROM {{ ref('model_params') }}
WHERE model_name = 'long_model_subquery_windows'
LIMIT 1
),

-- Inflate base rows by duplicating each customer 100x
base AS (
SELECT
c.customer_id,
d AS duplication_id
FROM {{ ref('customers') }} c
CROSS JOIN UNNEST(GENERATE_ARRAY(1, 100)) AS d
),

-- Generate large expansion using params
expanded AS (
SELECT
b.customer_id,
x AS factor,
POW(x, 0.5) AS sqrt_x,
SAFE_DIVIDE(x, NULLIF(MOD(x, 10), 0)) AS ratio,
LOG(x + 1) + SIN(x) + COS(x) AS expensive_math
FROM base b
CROSS JOIN params p
CROSS JOIN UNNEST(GENERATE_ARRAY(1, p.array_x)) AS x
),

-- Complex windowing across a large range
windowed AS (
SELECT
*,
AVG(expensive_math) OVER (
PARTITION BY customer_id
ORDER BY factor
ROWS BETWEEN 10000 PRECEDING AND CURRENT ROW
) AS moving_avg
FROM expanded
),

aggregated AS (
SELECT
customer_id,
SUM(moving_avg) AS total_avg,
COUNT(*) AS cnt
FROM windowed
GROUP BY customer_id
)

SELECT * FROM aggregated
Loading