diff --git a/dev/dags/dbt/multi_folder/.gitignore b/dev/dags/dbt/multi_folder/.gitignore new file mode 100644 index 0000000000..bc0684eb2c --- /dev/null +++ b/dev/dags/dbt/multi_folder/.gitignore @@ -0,0 +1,4 @@ +target/ +dbt_packages/ +logs/ +dbt_internal_packages/ diff --git a/dev/dags/dbt/multi_folder/dbt_project.yml b/dev/dags/dbt/multi_folder/dbt_project.yml new file mode 100644 index 0000000000..d4d6a0903a --- /dev/null +++ b/dev/dags/dbt/multi_folder/dbt_project.yml @@ -0,0 +1,23 @@ +name: 'multi_folder' +version: '0.1.0' +config-version: 2 + +profile: 'default' + +seed-paths: ["seeds"] +model-paths: ["models"] + +test-paths: ["tests"] +target-path: "target" +clean-targets: + - "target" + - "logs" + +require-dbt-version: [">=1.0.0", "<2.0.0"] + +models: + multi_folder: + models_a: + +materialized: view + models_b: + +materialized: view diff --git a/dev/dags/dbt/multi_folder/models/models_a/dim_products.sql b/dev/dags/dbt/multi_folder/models/models_a/dim_products.sql new file mode 100644 index 0000000000..00f3cb9d6e --- /dev/null +++ b/dev/dags/dbt/multi_folder/models/models_a/dim_products.sql @@ -0,0 +1,6 @@ +-- Depends on model: models_a/stg_products.sql +select + product_id, + product_name, + upper(product_name) as product_name_upper +from {{ ref('stg_products') }} diff --git a/dev/dags/dbt/multi_folder/models/models_a/stg_products.sql b/dev/dags/dbt/multi_folder/models/models_a/stg_products.sql new file mode 100644 index 0000000000..6186531e5a --- /dev/null +++ b/dev/dags/dbt/multi_folder/models/models_a/stg_products.sql @@ -0,0 +1,5 @@ +-- Depends on seed: seeds_a/products.csv +select + id as product_id, + name as product_name +from {{ ref('products') }} diff --git a/dev/dags/dbt/multi_folder/models/models_b/stg_regions.sql b/dev/dags/dbt/multi_folder/models/models_b/stg_regions.sql new file mode 100644 index 0000000000..076595d262 --- /dev/null +++ b/dev/dags/dbt/multi_folder/models/models_b/stg_regions.sql @@ -0,0 +1,7 @@ +-- Depends on seeds: seeds_b/regions.csv and seeds_b/region_managers.csv +select + r.id as region_id, + r.name as region_name, + m.manager_name +from {{ ref('regions') }} r +left join {{ ref('region_managers') }} m on r.id = m.region_id diff --git a/dev/dags/dbt/multi_folder/profiles.yml b/dev/dags/dbt/multi_folder/profiles.yml new file mode 100644 index 0000000000..224f565f4a --- /dev/null +++ b/dev/dags/dbt/multi_folder/profiles.yml @@ -0,0 +1,12 @@ +default: + target: dev + outputs: + dev: + type: postgres + host: "{{ env_var('POSTGRES_HOST') }}" + user: "{{ env_var('POSTGRES_USER') }}" + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: "{{ env_var('POSTGRES_PORT') | int }}" + dbname: "{{ env_var('POSTGRES_DB') }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + threads: 4 diff --git a/dev/dags/dbt/multi_folder/seeds/seeds_a/products.csv b/dev/dags/dbt/multi_folder/seeds/seeds_a/products.csv new file mode 100644 index 0000000000..76ca75993a --- /dev/null +++ b/dev/dags/dbt/multi_folder/seeds/seeds_a/products.csv @@ -0,0 +1,4 @@ +id,name +1,Widget +2,Gadget +3,Gizmo diff --git a/dev/dags/dbt/multi_folder/seeds/seeds_b/region_managers.csv b/dev/dags/dbt/multi_folder/seeds/seeds_b/region_managers.csv new file mode 100644 index 0000000000..b114874fad --- /dev/null +++ b/dev/dags/dbt/multi_folder/seeds/seeds_b/region_managers.csv @@ -0,0 +1,11 @@ +region_id,manager_name +1,Alice +2,Bob +3,Carol +4,Dave +5,Eve +6,Frank +7,Grace +8,Henry +9,Ivy +10,Jack diff --git a/dev/dags/dbt/multi_folder/seeds/seeds_b/regions.csv b/dev/dags/dbt/multi_folder/seeds/seeds_b/regions.csv new file mode 100644 index 0000000000..da206a7611 --- /dev/null +++ b/dev/dags/dbt/multi_folder/seeds/seeds_b/regions.csv @@ -0,0 +1,11 @@ +id,name +1,North +2,South +3,East +4,West +5,Central +6,Northeast +7,Northwest +8,Southeast +9,Southwest +10,Midwest diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 0dc5a962cd..14ac2f76c8 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -20,11 +20,12 @@ from airflow.hooks.subprocess import SubprocessResult from airflow.models.taskinstance import TaskInstance from airflow.utils.context import Context +from airflow.utils.state import DagRunState from packaging import version from pendulum import datetime import cosmos.dbt.runner as dbt_runner -from cosmos import cache +from cosmos import DbtDag, DbtTaskGroup, ProjectConfig, RenderConfig, cache from cosmos.config import ProfileConfig from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, InvocationMode from cosmos.dbt.parser.output import ( @@ -55,6 +56,7 @@ from tests.utils import test_dag as run_test_dag DBT_PROJ_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop" +MULTI_FOLDER_DBT_PROJ_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt/multi_folder" MINI_DBT_PROJ_DIR = Path(__file__).parent.parent / "sample/mini" MINI_DBT_PROJ_DIR_FAILING_SCHEMA = MINI_DBT_PROJ_DIR / "schema_failing_test.yml" MINI_DBT_PROJ_PROFILE = MINI_DBT_PROJ_DIR / "profiles.yml" @@ -76,6 +78,16 @@ mini_profile_config = ProfileConfig(profile_name="mini", target_name="dev", profiles_yml_filepath=MINI_DBT_PROJ_PROFILE) +multi_folder_profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + @pytest.fixture def failing_test_dbt_project(tmp_path): @@ -573,6 +585,78 @@ def test_run_operator_dataset_inlets_and_outlets_airflow_3_onwards(caplog): ) +@pytest.mark.integration +def test_dbt_dag_with_group_nodes_by_folder(): + """ + Run a DbtDag with RenderConfig(group_nodes_by_folder=True) (ExecutionMode.LOCAL). + Confirm the DAG runs successfully and tasks are grouped by folder (seeds_a, seeds_b, models_a, models_b). + """ + grouped_dag = DbtDag( + project_config=ProjectConfig(dbt_project_path=MULTI_FOLDER_DBT_PROJ_DIR), + profile_config=multi_folder_profile_config, + render_config=RenderConfig(group_nodes_by_folder=True, emit_datasets=False), + operator_args={"install_deps": True}, + start_date=datetime(2024, 1, 1), + dag_id="multi_folder_grouped_dag", + default_args={"retries": 0}, + ) + outcome = new_test_dag(grouped_dag) + assert outcome.state == DagRunState.SUCCESS + + # multi_folder has seeds_a (products), seeds_b (regions, region_managers), models_a (stg_products, dim_products), models_b (stg_regions) + assert len(grouped_dag.dbt_graph.filtered_nodes) == 6 # 3 seeds + 3 models + task_ids = set(grouped_dag.task_dict) + assert len(task_ids) == 6 # 3 seeds + 3 model runs + assert "seeds.seeds_a.products_seed" in task_ids + assert "seeds.seeds_b.regions_seed" in task_ids + assert "seeds.seeds_b.region_managers_seed" in task_ids + assert "models.models_a.stg_products_run" in task_ids + assert "models.models_a.dim_products_run" in task_ids + assert "models.models_b.stg_regions_run" in task_ids + + # Check dependencies + assert grouped_dag.task_dict["seeds.seeds_a.products_seed"].downstream_task_ids == { + "models.models_a.stg_products_run" + } + assert grouped_dag.task_dict["seeds.seeds_b.regions_seed"].downstream_task_ids == { + "models.models_b.stg_regions_run" + } + assert grouped_dag.task_dict["seeds.seeds_b.region_managers_seed"].downstream_task_ids == { + "models.models_b.stg_regions_run" + } + + +@pytest.mark.integration +def test_dbt_task_group_with_group_nodes_by_folder(): + """ + Run a DAG containing a DbtTaskGroup with RenderConfig(group_nodes_by_folder=True) (ExecutionMode.LOCAL). + Confirm the DAG runs successfully and tasks are grouped by folder. + """ + with DAG( + dag_id="multi_folder_grouped_task_group_dag", + start_date=datetime(2024, 1, 1), + default_args={"retries": 0}, + ) as dag: + DbtTaskGroup( + group_id="multi_folder_dbt", + project_config=ProjectConfig(dbt_project_path=MULTI_FOLDER_DBT_PROJ_DIR), + profile_config=multi_folder_profile_config, + render_config=RenderConfig(group_nodes_by_folder=True, emit_datasets=False), + operator_args={"install_deps": True}, + ) + outcome = new_test_dag(dag) + assert outcome.state == DagRunState.SUCCESS + + task_ids = set(dag.task_dict) + assert len(task_ids) == 6 # 3 seeds + 3 model runs + assert "multi_folder_dbt.seeds.seeds_a.products_seed" in task_ids + assert "multi_folder_dbt.seeds.seeds_b.regions_seed" in task_ids + assert "multi_folder_dbt.seeds.seeds_b.region_managers_seed" in task_ids + assert "multi_folder_dbt.models.models_a.stg_products_run" in task_ids + assert "multi_folder_dbt.models.models_a.dim_products_run" in task_ids + assert "multi_folder_dbt.models.models_b.stg_regions_run" in task_ids + + @pytest.mark.skipif( version.parse(airflow_version).major < 3, reason="Airflow 3.0 only supports assets when setting enable_dataset_alias=True (default)", diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 71ef484181..6b89f0ff9b 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -41,6 +41,7 @@ DBT_PROJECT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop" DBT_PROFILES_YAML_FILEPATH = DBT_PROJECT_PATH / "profiles.yml" +MULTI_FOLDER_DBT_PROJ_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt/multi_folder" DBT_EXECUTABLE_PATH = Path(__file__).parent.parent.parent / "venv-subprocess/bin/dbt" DBT_PROJECT_WITH_EMPTY_MODEL_PATH = Path(__file__).parent.parent / "sample/dbt_project_with_empty_model" @@ -1788,6 +1789,72 @@ def test_dbt_dag_with_watcher(capsys): assert message_count == 1, f"Expected '{log_message}' to be logged exactly once, but found {message_count} times" +@pytest.mark.integration +def test_dbt_dag_with_watcher_and_group_nodes_by_folder(capsys): + """ + Run a DbtDag using ExecutionMode.WATCHER with RenderConfig(group_nodes_by_folder=True) + and TestBehavior.AFTER_ALL (mirrors multi_folder_grouped_watcher_dag from dev/dags). + """ + watcher_dag = DbtDag( + project_config=ProjectConfig(dbt_project_path=MULTI_FOLDER_DBT_PROJ_DIR), + profile_config=profile_config, + execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER), + render_config=RenderConfig( + group_nodes_by_folder=True, + test_behavior=TestBehavior.AFTER_ALL, + emit_datasets=False, + ), + operator_args={ + "install_deps": True, + "trigger_rule": "all_success", + "execution_timeout": timedelta(seconds=120), + }, + start_date=datetime(2024, 1, 1), + dag_id="multi_folder_grouped_watcher_dag", + default_args={"retries": 0}, + ) + outcome = new_test_dag(watcher_dag) + assert outcome.state == DagRunState.SUCCESS + + assert len(watcher_dag.dbt_graph.filtered_nodes) == 6 # 3 seeds + 3 models + task_ids = set(watcher_dag.task_dict) + # 1 producer + 3 seeds + 3 model runs + 1 after_all test = 8 + assert len(task_ids) == 8 + assert "dbt_producer_watcher" in task_ids + assert "seeds.seeds_a.products_seed" in task_ids + assert "seeds.seeds_b.regions_seed" in task_ids + assert "seeds.seeds_b.region_managers_seed" in task_ids + assert "models.models_a.stg_products_run" in task_ids + assert "models.models_a.dim_products_run" in task_ids + assert "models.models_b.stg_regions_run" in task_ids + assert "multi_folder_test" in task_ids + + assert isinstance(watcher_dag.task_dict["dbt_producer_watcher"], DbtProducerWatcherOperator) + assert isinstance(watcher_dag.task_dict["seeds.seeds_a.products_seed"], DbtSeedWatcherOperator) + assert isinstance(watcher_dag.task_dict["models.models_a.stg_products_run"], DbtRunWatcherOperator) + + # AFTER_ALL test task is rendered as DbtTestLocalOperator, not DbtTestWatcherOperator + from cosmos.operators.local import DbtTestLocalOperator + + assert isinstance(watcher_dag.task_dict["multi_folder_test"], DbtTestLocalOperator) + + # Check dependencies + assert watcher_dag.task_dict["dbt_producer_watcher"].downstream_task_ids == { + "seeds.seeds_b.regions_seed", + "seeds.seeds_a.products_seed", + "seeds.seeds_b.region_managers_seed", + } + assert watcher_dag.task_dict["seeds.seeds_a.products_seed"].downstream_task_ids == { + "models.models_a.stg_products_run" + } + assert watcher_dag.task_dict["seeds.seeds_b.regions_seed"].downstream_task_ids == { + "models.models_b.stg_regions_run" + } + assert watcher_dag.task_dict["seeds.seeds_b.region_managers_seed"].downstream_task_ids == { + "models.models_b.stg_regions_run" + } + + @pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration def test_dbt_dag_with_watcher_and_subprocess(caplog):