Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
de2ea0c
feat: iterate over node file path to create TaskGroups based on model…
maximilianoarcieri Feb 23, 2025
103cb3b
Merge branch 'astronomer:main' into feat/create-task-groups-by-dbt-mo…
maximilianoarcieri Feb 27, 2025
14c0b8e
feat: add original_file_path as a class variable of DbtNode
maximilianoarcieri Feb 27, 2025
c090c89
feat: replace class variable to iterate over original_file_path
maximilianoarcieri Feb 27, 2025
1f84c5c
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Feb 27, 2025
c679a1d
feat: separate new logic in a function
maximilianoarcieri Feb 27, 2025
5e9e356
feat: sync changes with remote repository
maximilianoarcieri Feb 27, 2025
6c4d923
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Feb 27, 2025
ecd8e15
feat: add class variable origin_file_path in tests
maximilianoarcieri Feb 27, 2025
c6d2ed3
merge: branch 'feat/create-task-groups-by-dbt-models' of github.com:m…
maximilianoarcieri Feb 27, 2025
950d95e
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Feb 27, 2025
ff1894e
fix: rename class variable original_vile_path
maximilianoarcieri Feb 27, 2025
cadadee
feat: add task_group_id on task_id of tasks
maximilianoarcieri Feb 27, 2025
2989277
feat: sync with remote branch
maximilianoarcieri Feb 27, 2025
cd78121
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Feb 27, 2025
50ae158
feat: add type annotation for task_groups variable
maximilianoarcieri Feb 28, 2025
ff881b6
feat: add original_file_path variable DbtNode creation
maximilianoarcieri Feb 28, 2025
d0131aa
fix: allow None as a valid task_group in generate_parent_task_group
maximilianoarcieri Feb 28, 2025
76e9133
fix: add missing package methodtools to pass tests
Mar 1, 2025
4029b7c
fix: add missing package methodtools to pass tests
maximilianoarcieri Mar 1, 2025
5ee537e
fix: change data type of expected variable original_file_path
Mar 1, 2025
d18bdaa
sync: branch 'main' of github.com:maximilianoarcieri/astronomer-cosmos
maximilianoarcieri Mar 1, 2025
727106a
merge: branch 'main' into feat/create-task-groups-by-dbt-models
maximilianoarcieri Mar 1, 2025
b2b95f5
feat: set enable_resource_grouping to True in RenderConfig
Mar 1, 2025
d772671
feat: add enable_resource_grouping variable o make the feature option…
Mar 1, 2025
c7c15c5
feat: add documentation about class variable enable_resource_grouping…
Mar 1, 2025
74cfa6a
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Mar 1, 2025
7cbc218
fix: change structure to decomplex; format code in tests
Mar 1, 2025
621d262
merge: branch 'feat/create-task-groups-by-dbt-models' of github.com:m…
Mar 1, 2025
84952d6
Merge branch 'main' into feat/create-task-groups-by-dbt-models
tatiana Mar 3, 2025
65fb5b8
merge: branch 'main' of github.com:maximilianoarcieri/astronomer-cosmos
maximilianoarcieri Mar 3, 2025
6a0c706
feat: add class variable original_file_path in test_selector.py
Mar 3, 2025
45b343b
Merge branch 'astronomer:main' into feat/create-task-groups-by-dbt-mo…
maximilianoarcieri Mar 3, 2025
1f6a8a4
Merge branch 'main' into feat/create-task-groups-by-dbt-models
maximilianoarcieri Mar 3, 2025
b3c6b0e
Merge branch 'main' into feat/create-task-groups-by-dbt-models
maximilianoarcieri Mar 4, 2025
d73f728
Merge branch 'main' into feat/create-task-groups-by-dbt-models
maximilianoarcieri Mar 4, 2025
9d95f39
Merge branch 'main' of github.com:maximilianoarcieri/astronomer-cosmos
maximilianoarcieri Apr 28, 2025
d40a31d
fix: merge with main and resolve conflicts
maximilianoarcieri Apr 28, 2025
c078082
Merge branch 'main' into feat/create-task-groups-by-dbt-models
maximilianoarcieri Apr 28, 2025
bde5062
fix: reorganize parameters
maximilianoarcieri Apr 28, 2025
0539a16
fix: add missing argument in test
maximilianoarcieri Apr 28, 2025
0d4ba13
Merge branch 'feat/create-task-groups-by-dbt-models' of github.com:ma…
maximilianoarcieri Apr 28, 2025
0eed825
Merge branch 'main' of github.com:maximilianoarcieri/astronomer-cosmos
maximilianoarcieri Apr 28, 2025
0b9c39d
Merge branch 'main' into feat/create-task-groups-by-dbt-models
maximilianoarcieri Apr 28, 2025
5221dbe
Merge branch 'main' into feat/create-task-groups-by-dbt-models
maximilianoarcieri Apr 28, 2025
8232147
Merge branch 'main' into feat/create-task-groups-by-dbt-models
tatiana Apr 30, 2025
b2eb75a
Merge branch 'main' into feat/create-task-groups-by-dbt-models
tatiana Mar 2, 2026
9894be9
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Mar 2, 2026
b925a20
Fix pre-commit issues post-conflict resolving with main
tatiana Mar 2, 2026
aa2989c
Avoid redundant original_file_path definition and get unit tests to pass
tatiana Mar 2, 2026
589df48
Rename enable_resource_grouping to group_nodes_by_folder
tatiana Mar 2, 2026
12c73f8
Fix breaking test test_graph.py::test_build_airflow_graph_after_all
tatiana Mar 2, 2026
4792651
Add multi_folder dbt project and example DAG to illustrate how it loo…
tatiana Mar 3, 2026
f12852d
Improve test coverage
tatiana Mar 3, 2026
17c6d03
Rename generate_resource_task_group to create_task_groups_based_on_fo…
tatiana Mar 3, 2026
a69d66e
Add a multi folder DAG using ExecutionMode.WATCHER
tatiana Mar 3, 2026
7c1a30d
Bump actions/download-artifact from 7 to 8 (#2425)
dependabot[bot] Mar 3, 2026
82a51a4
⬆ [pre-commit.ci] pre-commit autoupdate (#2422)
pre-commit-ci[bot] Mar 3, 2026
0a1add3
Introduce the concept of an `interceptor` in Cosmos tasks (#2419)
tatiana Mar 3, 2026
dcceb76
Revert virtualenv pin for hatch installation in CI (#2426)
pankajkoti Mar 3, 2026
4d25b3d
Change multi_folder_grouped_watcher_dag so it uses TestBehavior after…
tatiana Mar 3, 2026
f69b443
Remove multi folder example DAG and create individual tests
tatiana Mar 3, 2026
f35eccf
Fix integration tests
tatiana Mar 3, 2026
326f266
Merge branch 'main' into feat/create-task-groups-by-dbt-models
tatiana Mar 5, 2026
d7ded97
Remove update from old render-config
tatiana Mar 17, 2026
38d366c
Merge branch 'main' into feat/create-task-groups-by-dbt-models
tatiana Mar 17, 2026
201bec9
Update docs
tatiana Mar 17, 2026
ba646e3
Merge branch 'main' into feat/create-task-groups-by-dbt-models
tatiana Mar 18, 2026
0ecac5e
Fix merge commit issues
tatiana Mar 18, 2026
e7e644e
Remove @tatiana's contributions
tatiana Mar 18, 2026
7ccbf4c
Apply suggestion from @tatiana
tatiana Mar 18, 2026
7fea8aa
Apply suggestion from @tatiana
tatiana Mar 18, 2026
1c22749
Fix broken test when merging GH suggestion
tatiana Mar 18, 2026
407138d
Add integration tests to validate the TaskGroup per folder feature
tatiana Mar 18, 2026
2e6c709
Revert "Remove @tatiana's contributions"
tatiana Mar 18, 2026
4bd719c
Merge branch 'main' into feat/create-task-groups-by-dbt-models
tatiana Mar 18, 2026
db25dde
Add depedencies checks on tests
tatiana Mar 19, 2026
8ac42cb
Fix failing unit tests
tatiana Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dev/dags/dbt/multi_folder/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
target/
dbt_packages/
logs/
dbt_internal_packages/
23 changes: 23 additions & 0 deletions dev/dags/dbt/multi_folder/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: 'multi_folder'
version: '0.1.0'
config-version: 2

profile: 'default'

seed-paths: ["seeds"]
model-paths: ["models"]

test-paths: ["tests"]
target-path: "target"
clean-targets:
- "target"
- "logs"

require-dbt-version: [">=1.0.0", "<2.0.0"]

models:
multi_folder:
models_a:
+materialized: view
models_b:
+materialized: view
6 changes: 6 additions & 0 deletions dev/dags/dbt/multi_folder/models/models_a/dim_products.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Depends on model: models_a/stg_products.sql
select
product_id,
product_name,
upper(product_name) as product_name_upper
from {{ ref('stg_products') }}
5 changes: 5 additions & 0 deletions dev/dags/dbt/multi_folder/models/models_a/stg_products.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Depends on seed: seeds_a/products.csv
select
id as product_id,
name as product_name
from {{ ref('products') }}
7 changes: 7 additions & 0 deletions dev/dags/dbt/multi_folder/models/models_b/stg_regions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Depends on seeds: seeds_b/regions.csv and seeds_b/region_managers.csv
select
r.id as region_id,
r.name as region_name,
m.manager_name
from {{ ref('regions') }} r
left join {{ ref('region_managers') }} m on r.id = m.region_id
12 changes: 12 additions & 0 deletions dev/dags/dbt/multi_folder/profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
default:
target: dev
outputs:
dev:
type: postgres
host: "{{ env_var('POSTGRES_HOST') }}"
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
port: "{{ env_var('POSTGRES_PORT') | int }}"
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: "{{ env_var('POSTGRES_SCHEMA') }}"
threads: 4
4 changes: 4 additions & 0 deletions dev/dags/dbt/multi_folder/seeds/seeds_a/products.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id,name
1,Widget
2,Gadget
3,Gizmo
11 changes: 11 additions & 0 deletions dev/dags/dbt/multi_folder/seeds/seeds_b/region_managers.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
region_id,manager_name
1,Alice
2,Bob
3,Carol
4,Dave
5,Eve
6,Frank
7,Grace
8,Henry
9,Ivy
10,Jack
11 changes: 11 additions & 0 deletions dev/dags/dbt/multi_folder/seeds/seeds_b/regions.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
id,name
1,North
2,South
3,East
4,West
5,Central
6,Northeast
7,Northwest
8,Southeast
9,Southwest
10,Midwest
86 changes: 85 additions & 1 deletion tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
from airflow.hooks.subprocess import SubprocessResult
from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context
from airflow.utils.state import DagRunState
from packaging import version
from pendulum import datetime

import cosmos.dbt.runner as dbt_runner
from cosmos import cache
from cosmos import DbtDag, DbtTaskGroup, ProjectConfig, RenderConfig, cache
from cosmos.config import ProfileConfig
from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, InvocationMode
from cosmos.dbt.parser.output import (
Expand Down Expand Up @@ -55,6 +56,7 @@
from tests.utils import test_dag as run_test_dag

DBT_PROJ_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop"
MULTI_FOLDER_DBT_PROJ_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt/multi_folder"
MINI_DBT_PROJ_DIR = Path(__file__).parent.parent / "sample/mini"
MINI_DBT_PROJ_DIR_FAILING_SCHEMA = MINI_DBT_PROJ_DIR / "schema_failing_test.yml"
MINI_DBT_PROJ_PROFILE = MINI_DBT_PROJ_DIR / "profiles.yml"
Expand All @@ -76,6 +78,16 @@

mini_profile_config = ProfileConfig(profile_name="mini", target_name="dev", profiles_yml_filepath=MINI_DBT_PROJ_PROFILE)

multi_folder_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)


@pytest.fixture
def failing_test_dbt_project(tmp_path):
Expand Down Expand Up @@ -573,6 +585,78 @@ def test_run_operator_dataset_inlets_and_outlets_airflow_3_onwards(caplog):
)


@pytest.mark.integration
def test_dbt_dag_with_group_nodes_by_folder():
"""
Run a DbtDag with RenderConfig(group_nodes_by_folder=True) (ExecutionMode.LOCAL).
Confirm the DAG runs successfully and tasks are grouped by folder (seeds_a, seeds_b, models_a, models_b).
"""
grouped_dag = DbtDag(
project_config=ProjectConfig(dbt_project_path=MULTI_FOLDER_DBT_PROJ_DIR),
profile_config=multi_folder_profile_config,
render_config=RenderConfig(group_nodes_by_folder=True, emit_datasets=False),
operator_args={"install_deps": True},
start_date=datetime(2024, 1, 1),
dag_id="multi_folder_grouped_dag",
default_args={"retries": 0},
)
outcome = new_test_dag(grouped_dag)
assert outcome.state == DagRunState.SUCCESS

# multi_folder has seeds_a (products), seeds_b (regions, region_managers), models_a (stg_products, dim_products), models_b (stg_regions)
assert len(grouped_dag.dbt_graph.filtered_nodes) == 6 # 3 seeds + 3 models
task_ids = set(grouped_dag.task_dict)
assert len(task_ids) == 6 # 3 seeds + 3 model runs
assert "seeds.seeds_a.products_seed" in task_ids
Comment thread
tatiana marked this conversation as resolved.
assert "seeds.seeds_b.regions_seed" in task_ids
assert "seeds.seeds_b.region_managers_seed" in task_ids
assert "models.models_a.stg_products_run" in task_ids
assert "models.models_a.dim_products_run" in task_ids
assert "models.models_b.stg_regions_run" in task_ids

Comment thread
tatiana marked this conversation as resolved.
# Check dependencies
assert grouped_dag.task_dict["seeds.seeds_a.products_seed"].downstream_task_ids == {
"models.models_a.stg_products_run"
}
assert grouped_dag.task_dict["seeds.seeds_b.regions_seed"].downstream_task_ids == {
"models.models_b.stg_regions_run"
}
assert grouped_dag.task_dict["seeds.seeds_b.region_managers_seed"].downstream_task_ids == {
"models.models_b.stg_regions_run"
}


@pytest.mark.integration
def test_dbt_task_group_with_group_nodes_by_folder():
"""
Run a DAG containing a DbtTaskGroup with RenderConfig(group_nodes_by_folder=True) (ExecutionMode.LOCAL).
Confirm the DAG runs successfully and tasks are grouped by folder.
"""
with DAG(
dag_id="multi_folder_grouped_task_group_dag",
start_date=datetime(2024, 1, 1),
default_args={"retries": 0},
) as dag:
DbtTaskGroup(
group_id="multi_folder_dbt",
project_config=ProjectConfig(dbt_project_path=MULTI_FOLDER_DBT_PROJ_DIR),
profile_config=multi_folder_profile_config,
render_config=RenderConfig(group_nodes_by_folder=True, emit_datasets=False),
operator_args={"install_deps": True},
)
outcome = new_test_dag(dag)
assert outcome.state == DagRunState.SUCCESS

task_ids = set(dag.task_dict)
assert len(task_ids) == 6 # 3 seeds + 3 model runs
assert "multi_folder_dbt.seeds.seeds_a.products_seed" in task_ids
assert "multi_folder_dbt.seeds.seeds_b.regions_seed" in task_ids
assert "multi_folder_dbt.seeds.seeds_b.region_managers_seed" in task_ids
assert "multi_folder_dbt.models.models_a.stg_products_run" in task_ids
assert "multi_folder_dbt.models.models_a.dim_products_run" in task_ids
assert "multi_folder_dbt.models.models_b.stg_regions_run" in task_ids


@pytest.mark.skipif(
version.parse(airflow_version).major < 3,
reason="Airflow 3.0 only supports assets when setting enable_dataset_alias=True (default)",
Expand Down
67 changes: 67 additions & 0 deletions tests/operators/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

DBT_PROJECT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop"
DBT_PROFILES_YAML_FILEPATH = DBT_PROJECT_PATH / "profiles.yml"
MULTI_FOLDER_DBT_PROJ_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt/multi_folder"

DBT_EXECUTABLE_PATH = Path(__file__).parent.parent.parent / "venv-subprocess/bin/dbt"
DBT_PROJECT_WITH_EMPTY_MODEL_PATH = Path(__file__).parent.parent / "sample/dbt_project_with_empty_model"
Expand Down Expand Up @@ -1788,6 +1789,72 @@ def test_dbt_dag_with_watcher(capsys):
assert message_count == 1, f"Expected '{log_message}' to be logged exactly once, but found {message_count} times"


@pytest.mark.integration
def test_dbt_dag_with_watcher_and_group_nodes_by_folder(capsys):
"""
Run a DbtDag using ExecutionMode.WATCHER with RenderConfig(group_nodes_by_folder=True)
and TestBehavior.AFTER_ALL (mirrors multi_folder_grouped_watcher_dag from dev/dags).
Comment thread
tatiana marked this conversation as resolved.
"""
watcher_dag = DbtDag(
project_config=ProjectConfig(dbt_project_path=MULTI_FOLDER_DBT_PROJ_DIR),
profile_config=profile_config,
execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER),
render_config=RenderConfig(
group_nodes_by_folder=True,
test_behavior=TestBehavior.AFTER_ALL,
emit_datasets=False,
),
operator_args={
"install_deps": True,
"trigger_rule": "all_success",
"execution_timeout": timedelta(seconds=120),
},
start_date=datetime(2024, 1, 1),
dag_id="multi_folder_grouped_watcher_dag",
default_args={"retries": 0},
)
outcome = new_test_dag(watcher_dag)
assert outcome.state == DagRunState.SUCCESS

assert len(watcher_dag.dbt_graph.filtered_nodes) == 6 # 3 seeds + 3 models
task_ids = set(watcher_dag.task_dict)
# 1 producer + 3 seeds + 3 model runs + 1 after_all test = 8
assert len(task_ids) == 8
assert "dbt_producer_watcher" in task_ids
assert "seeds.seeds_a.products_seed" in task_ids
assert "seeds.seeds_b.regions_seed" in task_ids
assert "seeds.seeds_b.region_managers_seed" in task_ids
assert "models.models_a.stg_products_run" in task_ids
assert "models.models_a.dim_products_run" in task_ids
assert "models.models_b.stg_regions_run" in task_ids
assert "multi_folder_test" in task_ids

assert isinstance(watcher_dag.task_dict["dbt_producer_watcher"], DbtProducerWatcherOperator)
assert isinstance(watcher_dag.task_dict["seeds.seeds_a.products_seed"], DbtSeedWatcherOperator)
assert isinstance(watcher_dag.task_dict["models.models_a.stg_products_run"], DbtRunWatcherOperator)

# AFTER_ALL test task is rendered as DbtTestLocalOperator, not DbtTestWatcherOperator
from cosmos.operators.local import DbtTestLocalOperator

assert isinstance(watcher_dag.task_dict["multi_folder_test"], DbtTestLocalOperator)

# Check dependencies
assert watcher_dag.task_dict["dbt_producer_watcher"].downstream_task_ids == {
"seeds.seeds_b.regions_seed",
"seeds.seeds_a.products_seed",
"seeds.seeds_b.region_managers_seed",
}
assert watcher_dag.task_dict["seeds.seeds_a.products_seed"].downstream_task_ids == {
"models.models_a.stg_products_run"
}
assert watcher_dag.task_dict["seeds.seeds_b.regions_seed"].downstream_task_ids == {
"models.models_b.stg_regions_run"
}
assert watcher_dag.task_dict["seeds.seeds_b.region_managers_seed"].downstream_task_ids == {
"models.models_b.stg_regions_run"
}


@pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release")
@pytest.mark.integration
def test_dbt_dag_with_watcher_and_subprocess(caplog):
Expand Down
Loading