From ea7ae1adef11e1ac67572724a3b0d193d3911799 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 27 Mar 2026 01:47:07 +0530 Subject: [PATCH 1/3] Fix TestBehavior.NONE and AFTER_ALL ignored with selectors in Watcher mode When a dbt selector is used, dbt ignores the --exclude flag, causing tests to run despite TestBehavior.NONE or AFTER_ALL. Use --resource-type as a final filter instead, which dbt respects even with selectors. Closes #2415 Co-Authored-By: anissarashid <48539288+anissarashid@users.noreply.github.com> Co-Authored-By: johnhoran Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/airflow/graph.py | 19 +++++++--- cosmos/constants.py | 5 +++ tests/airflow/test_graph.py | 72 ++++++++++++++++++++++++++++++++++++- 3 files changed, 90 insertions(+), 6 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 54721b04d9..50453df248 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -28,6 +28,7 @@ PRODUCER_WATCHER_TASK_ID, SUPPORTED_BUILD_RESOURCES, TESTABLE_DBT_RESOURCES, + WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST, DbtResourceType, ExecutionMode, SourceRenderingBehavior, @@ -694,12 +695,20 @@ def _add_watcher_producer_task( producer_task_args["exclude"] = _convert_list_to_str(render_config.exclude) if render_config.test_behavior in [TestBehavior.NONE, TestBehavior.AFTER_ALL]: - additional_excludes = "resource_type:test resource_type:unit_test" - current_exclude = producer_task_args.get("exclude") - if current_exclude: - producer_task_args["exclude"] = f"{current_exclude} {additional_excludes}" + if render_config.selector: + # When a dbt selector is used, --exclude is ignored by dbt. + # Use --resource-type as a final filter to exclude tests instead. + resource_types = WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST + dbt_cmd_flags = producer_task_args.get("dbt_cmd_flags") or [] + dbt_cmd_flags.extend(["--resource-type", resource_types]) + producer_task_args["dbt_cmd_flags"] = dbt_cmd_flags else: - producer_task_args["exclude"] = additional_excludes + additional_excludes = "resource_type:test resource_type:unit_test" + current_exclude = producer_task_args.get("exclude") + if current_exclude: + producer_task_args["exclude"] = f"{current_exclude} {additional_excludes}" + else: + producer_task_args["exclude"] = additional_excludes class_name = calculate_operator_class(execution_mode, "DbtProducer") diff --git a/cosmos/constants.py b/cosmos/constants.py index 1f6b3439dd..b2ba49ea8e 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -178,6 +178,11 @@ def _missing_value_(cls, value): # type: ignore # https://docs.getdbt.com/reference/commands/test TESTABLE_DBT_RESOURCES = {DbtResourceType.MODEL, DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED} +# Resource types that the watcher producer task should build (everything except tests). +# Used with --resource-type flag when a dbt selector is set, since --exclude is ignored by dbt in that case. +# Update this list if Cosmos adds watcher operators for new dbt resource types (e.g., saved_query, semantic_model). +WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST = "model seed snapshot source" + DBT_SETUP_ASYNC_TASK_ID = "dbt_setup_async" DBT_TEARDOWN_ASYNC_TASK_ID = "dbt_teardown_async" diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 9e6863164f..00eb684d7e 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -6,7 +6,7 @@ import pytest from airflow.models import DAG -from cosmos.operators.watcher import DbtTestWatcherOperator +from cosmos.operators.watcher import DbtProducerWatcherOperator, DbtTestWatcherOperator try: # Airflow 3.1 onwards @@ -33,6 +33,7 @@ ) from cosmos.config import ProfileConfig, RenderConfig from cosmos.constants import ( + WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST, DbtResourceType, ExecutionMode, SourceRenderingBehavior, @@ -1187,6 +1188,75 @@ def test_test_behavior_for_watcher_mode(test_behavior): assert len(tasks) == 6 +@pytest.mark.parametrize("test_behavior", [TestBehavior.NONE, TestBehavior.AFTER_ALL]) +def test_watcher_producer_uses_resource_type_flag_when_selector_is_set(test_behavior): + """When a dbt selector is used, --exclude is ignored by dbt. The producer should use --resource-type instead.""" + with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: + task_args = { + "project_dir": SAMPLE_PROJ_PATH, + "conn_id": "fake_conn", + "profile_config": ProfileConfig( + profile_name="default", + target_name="default", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="fake_conn", + profile_args={"schema": "public"}, + ), + ), + } + + build_airflow_graph( + nodes=sample_nodes, + dag=dag, + execution_mode=ExecutionMode.WATCHER, + test_indirect_selection=TestIndirectSelection.EAGER, + task_args=task_args, + render_config=RenderConfig( + test_behavior=test_behavior, + selector="my_selector", + ), + dbt_project_name="astro_shop", + ) + producer_task = next(task for task in dag.tasks if isinstance(task, DbtProducerWatcherOperator)) + assert "--resource-type" in producer_task.dbt_cmd_flags + resource_type_idx = producer_task.dbt_cmd_flags.index("--resource-type") + assert producer_task.dbt_cmd_flags[resource_type_idx + 1] == WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST + assert producer_task.exclude is None or "resource_type:test" not in (producer_task.exclude or "") + + +@pytest.mark.parametrize("test_behavior", [TestBehavior.NONE, TestBehavior.AFTER_ALL]) +def test_watcher_producer_uses_exclude_when_no_selector(test_behavior): + """Without a selector, the producer should use --exclude to filter tests (existing behavior).""" + with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: + task_args = { + "project_dir": SAMPLE_PROJ_PATH, + "conn_id": "fake_conn", + "profile_config": ProfileConfig( + profile_name="default", + target_name="default", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="fake_conn", + profile_args={"schema": "public"}, + ), + ), + } + + build_airflow_graph( + nodes=sample_nodes, + dag=dag, + execution_mode=ExecutionMode.WATCHER, + test_indirect_selection=TestIndirectSelection.EAGER, + task_args=task_args, + render_config=RenderConfig( + test_behavior=test_behavior, + ), + dbt_project_name="astro_shop", + ) + producer_task = next(task for task in dag.tasks if isinstance(task, DbtProducerWatcherOperator)) + assert "resource_type:test" in (producer_task.exclude or "") + assert not producer_task.dbt_cmd_flags + + def test_custom_meta(): with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: task_args = { From c65eb1a16d486e88894ff66a8bfe6577e3cfe3c5 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 31 Mar 2026 13:08:44 +0530 Subject: [PATCH 2/3] Use --resource-type uniformly to exclude tests in Watcher producer Address PR review feedback: - Unify selector and non-selector paths into a single --resource-type approach, since --exclude is ignored by dbt when selectors are used - Fix argv bug: pass each resource type as a separate --resource-type flag instead of a single space-delimited string - Use a tuple of DbtResourceType enums instead of a raw string constant - Copy dbt_cmd_flags list before extending to avoid mutating the caller's original list - Update tests to cover both selector/non-selector cases and verify the original flags list is not mutated Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/airflow/graph.py | 21 ++++++------------ cosmos/constants.py | 11 +++++++--- tests/airflow/test_graph.py | 43 ++++++++++++++++++++++--------------- 3 files changed, 41 insertions(+), 34 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 50453df248..86f16c8c1d 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -695,20 +695,13 @@ def _add_watcher_producer_task( producer_task_args["exclude"] = _convert_list_to_str(render_config.exclude) if render_config.test_behavior in [TestBehavior.NONE, TestBehavior.AFTER_ALL]: - if render_config.selector: - # When a dbt selector is used, --exclude is ignored by dbt. - # Use --resource-type as a final filter to exclude tests instead. - resource_types = WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST - dbt_cmd_flags = producer_task_args.get("dbt_cmd_flags") or [] - dbt_cmd_flags.extend(["--resource-type", resource_types]) - producer_task_args["dbt_cmd_flags"] = dbt_cmd_flags - else: - additional_excludes = "resource_type:test resource_type:unit_test" - current_exclude = producer_task_args.get("exclude") - if current_exclude: - producer_task_args["exclude"] = f"{current_exclude} {additional_excludes}" - else: - producer_task_args["exclude"] = additional_excludes + # Use --resource-type to exclude tests from the producer dbt build command. + # This works both with and without selectors (--exclude is ignored by dbt when a selector is used). + existing_flags = producer_task_args.get("dbt_cmd_flags") or [] + dbt_cmd_flags = list(existing_flags) + for resource_type in WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST: + dbt_cmd_flags.extend(["--resource-type", resource_type.value]) # type: ignore[attr-defined] + producer_task_args["dbt_cmd_flags"] = dbt_cmd_flags class_name = calculate_operator_class(execution_mode, "DbtProducer") diff --git a/cosmos/constants.py b/cosmos/constants.py index b2ba49ea8e..3742aecdc2 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -179,9 +179,14 @@ def _missing_value_(cls, value): # type: ignore TESTABLE_DBT_RESOURCES = {DbtResourceType.MODEL, DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED} # Resource types that the watcher producer task should build (everything except tests). -# Used with --resource-type flag when a dbt selector is set, since --exclude is ignored by dbt in that case. -# Update this list if Cosmos adds watcher operators for new dbt resource types (e.g., saved_query, semantic_model). -WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST = "model seed snapshot source" +# Used with --resource-type flag to exclude tests from the producer dbt build command. +# Update this tuple if Cosmos adds watcher operators for new dbt resource types (e.g., saved_query, semantic_model). +WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST = ( + DbtResourceType.MODEL, + DbtResourceType.SEED, + DbtResourceType.SNAPSHOT, + DbtResourceType.SOURCE, +) DBT_SETUP_ASYNC_TASK_ID = "dbt_setup_async" DBT_TEARDOWN_ASYNC_TASK_ID = "dbt_teardown_async" diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 00eb684d7e..ab490d4608 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -1189,8 +1189,9 @@ def test_test_behavior_for_watcher_mode(test_behavior): @pytest.mark.parametrize("test_behavior", [TestBehavior.NONE, TestBehavior.AFTER_ALL]) -def test_watcher_producer_uses_resource_type_flag_when_selector_is_set(test_behavior): - """When a dbt selector is used, --exclude is ignored by dbt. The producer should use --resource-type instead.""" +@pytest.mark.parametrize("selector", [None, "my_selector"]) +def test_watcher_producer_uses_resource_type_flag_to_exclude_tests(test_behavior, selector): + """The producer should use --resource-type to exclude tests, regardless of whether a selector is used.""" with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: task_args = { "project_dir": SAMPLE_PROJ_PATH, @@ -1205,32 +1206,38 @@ def test_watcher_producer_uses_resource_type_flag_when_selector_is_set(test_beha ), } + render_config = RenderConfig(test_behavior=test_behavior) + if selector: + render_config = RenderConfig(test_behavior=test_behavior, selector=selector) + build_airflow_graph( nodes=sample_nodes, dag=dag, execution_mode=ExecutionMode.WATCHER, test_indirect_selection=TestIndirectSelection.EAGER, task_args=task_args, - render_config=RenderConfig( - test_behavior=test_behavior, - selector="my_selector", - ), + render_config=render_config, dbt_project_name="astro_shop", ) producer_task = next(task for task in dag.tasks if isinstance(task, DbtProducerWatcherOperator)) - assert "--resource-type" in producer_task.dbt_cmd_flags - resource_type_idx = producer_task.dbt_cmd_flags.index("--resource-type") - assert producer_task.dbt_cmd_flags[resource_type_idx + 1] == WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST - assert producer_task.exclude is None or "resource_type:test" not in (producer_task.exclude or "") + + # Should use --resource-type flags, not --exclude + expected_resource_types = [rt.value for rt in WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST] + flags = producer_task.dbt_cmd_flags + actual_resource_types = [flags[i + 1] for i in range(len(flags)) if flags[i] == "--resource-type"] + assert actual_resource_types == expected_resource_types + assert "resource_type:test" not in (producer_task.exclude or "") @pytest.mark.parametrize("test_behavior", [TestBehavior.NONE, TestBehavior.AFTER_ALL]) -def test_watcher_producer_uses_exclude_when_no_selector(test_behavior): - """Without a selector, the producer should use --exclude to filter tests (existing behavior).""" +def test_watcher_producer_preserves_existing_dbt_cmd_flags(test_behavior): + """The producer should not mutate the original dbt_cmd_flags list and should preserve existing flags.""" with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: + original_flags = ["--full-refresh"] task_args = { "project_dir": SAMPLE_PROJ_PATH, "conn_id": "fake_conn", + "dbt_cmd_flags": original_flags, "profile_config": ProfileConfig( profile_name="default", target_name="default", @@ -1247,14 +1254,16 @@ def test_watcher_producer_uses_exclude_when_no_selector(test_behavior): execution_mode=ExecutionMode.WATCHER, test_indirect_selection=TestIndirectSelection.EAGER, task_args=task_args, - render_config=RenderConfig( - test_behavior=test_behavior, - ), + render_config=RenderConfig(test_behavior=test_behavior), dbt_project_name="astro_shop", ) producer_task = next(task for task in dag.tasks if isinstance(task, DbtProducerWatcherOperator)) - assert "resource_type:test" in (producer_task.exclude or "") - assert not producer_task.dbt_cmd_flags + + # Original flags should not be mutated + assert original_flags == ["--full-refresh"] + # Producer should have the original flag plus the resource-type flags + assert "--full-refresh" in producer_task.dbt_cmd_flags + assert "--resource-type" in producer_task.dbt_cmd_flags def test_custom_meta(): From 515118be20852773f8ca98282c650e59c6f52f0a Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 31 Mar 2026 13:32:10 +0530 Subject: [PATCH 3/3] Drop WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST, reuse SUPPORTED_BUILD_RESOURCES Sources are not buildable by dbt, so align the --resource-type filter with the existing SUPPORTED_BUILD_RESOURCES constant instead of maintaining a separate one. Co-Authored-By: Claude Opus 4.6 (1M context) --- cosmos/airflow/graph.py | 3 +-- cosmos/constants.py | 10 ---------- tests/airflow/test_graph.py | 4 ++-- 3 files changed, 3 insertions(+), 14 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 86f16c8c1d..e648999399 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -28,7 +28,6 @@ PRODUCER_WATCHER_TASK_ID, SUPPORTED_BUILD_RESOURCES, TESTABLE_DBT_RESOURCES, - WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST, DbtResourceType, ExecutionMode, SourceRenderingBehavior, @@ -699,7 +698,7 @@ def _add_watcher_producer_task( # This works both with and without selectors (--exclude is ignored by dbt when a selector is used). existing_flags = producer_task_args.get("dbt_cmd_flags") or [] dbt_cmd_flags = list(existing_flags) - for resource_type in WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST: + for resource_type in SUPPORTED_BUILD_RESOURCES: dbt_cmd_flags.extend(["--resource-type", resource_type.value]) # type: ignore[attr-defined] producer_task_args["dbt_cmd_flags"] = dbt_cmd_flags diff --git a/cosmos/constants.py b/cosmos/constants.py index 3742aecdc2..1f6b3439dd 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -178,16 +178,6 @@ def _missing_value_(cls, value): # type: ignore # https://docs.getdbt.com/reference/commands/test TESTABLE_DBT_RESOURCES = {DbtResourceType.MODEL, DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED} -# Resource types that the watcher producer task should build (everything except tests). -# Used with --resource-type flag to exclude tests from the producer dbt build command. -# Update this tuple if Cosmos adds watcher operators for new dbt resource types (e.g., saved_query, semantic_model). -WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST = ( - DbtResourceType.MODEL, - DbtResourceType.SEED, - DbtResourceType.SNAPSHOT, - DbtResourceType.SOURCE, -) - DBT_SETUP_ASYNC_TASK_ID = "dbt_setup_async" DBT_TEARDOWN_ASYNC_TASK_ID = "dbt_teardown_async" diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index ab490d4608..db22d8c127 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -33,7 +33,7 @@ ) from cosmos.config import ProfileConfig, RenderConfig from cosmos.constants import ( - WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST, + SUPPORTED_BUILD_RESOURCES, DbtResourceType, ExecutionMode, SourceRenderingBehavior, @@ -1222,7 +1222,7 @@ def test_watcher_producer_uses_resource_type_flag_to_exclude_tests(test_behavior producer_task = next(task for task in dag.tasks if isinstance(task, DbtProducerWatcherOperator)) # Should use --resource-type flags, not --exclude - expected_resource_types = [rt.value for rt in WATCHER_BUILD_RESOURCE_TYPES_EXCEPT_TEST] + expected_resource_types = [rt.value for rt in SUPPORTED_BUILD_RESOURCES] flags = producer_task.dbt_cmd_flags actual_resource_types = [flags[i + 1] for i in range(len(flags)) if flags[i] == "--resource-type"] assert actual_resource_types == expected_resource_types