Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,12 +694,17 @@ def _add_watcher_producer_task(
producer_task_args["exclude"] = _convert_list_to_str(render_config.exclude)

if render_config.test_behavior in [TestBehavior.NONE, TestBehavior.AFTER_ALL]:
additional_excludes = "resource_type:test resource_type:unit_test"
current_exclude = producer_task_args.get("exclude")
if current_exclude:
producer_task_args["exclude"] = f"{current_exclude} {additional_excludes}"
# Use `dbt run` instead of `dbt build` so tests are not executed by the
# producer (dbt ignores --exclude with --selector).
if render_config.selector:
producer_task_args["use_run_command"] = True

Copilot AI Mar 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching the watcher producer from dbt build to dbt run when a selector is set will skip non-model resources (e.g., seeds/snapshots) because dbt run only executes models. In WATCHER modes, consumer sensors for seeds/snapshots assume the producer executed those nodes and pushed statuses to XCom, so this can cause sensors to wait/timeout. Consider guarding use_run_command behind a check that the selected nodes are models-only (or raise a clear config error), or use a build-based approach that can still exclude tests with selectors.

Suggested change
producer_task_args["use_run_command"] = True
# Switching to `dbt run` means only models will be executed. In WATCHER
# mode, non-model resources (e.g. seeds/snapshots) must still be
# executed by the producer so their statuses are pushed for sensors.
# Only enable `use_run_command` when we can verify that the selected
# resources are models-only; otherwise raise a clear config error or
# fall back to the default `dbt build` behavior.
resource_types = getattr(render_config, "resource_types", None)
if resource_types is None:
# We cannot verify resource types on this RenderConfig. To avoid
# silently skipping non-model resources, keep using `dbt build`
# instead of switching to `dbt run`.
logger.warning(
"Watcher producer with selector cannot verify resource types; "
"falling back to `dbt build` to ensure non-model resources "
"are executed."
)
else:
non_model_resources = [
rt for rt in resource_types if rt is not DbtResourceType.MODEL
]
if non_model_resources:
raise CosmosValueError(
"Invalid configuration for watcher producer: using a "
"dbt selector with non-model resources while tests are "
"disabled/deferred would require switching to `dbt run`, "
"which would skip non-model resources and cause watcher "
"sensors to wait or time out. Restrict the selector to "
"models-only or adjust test behavior."
)
producer_task_args["use_run_command"] = True

Copilot uses AI. Check for mistakes.
else:
producer_task_args["exclude"] = additional_excludes
additional_excludes = "resource_type:test resource_type:unit_test"
current_exclude = producer_task_args.get("exclude")
if current_exclude:
producer_task_args["exclude"] = f"{current_exclude} {additional_excludes}"
else:
producer_task_args["exclude"] = additional_excludes

class_name = calculate_operator_class(execution_mode, "DbtProducer")

Expand Down
5 changes: 5 additions & 0 deletions cosmos/operators/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
task_id = kwargs.pop("task_id", PRODUCER_WATCHER_TASK_ID)
self.tests_per_model: dict[str, list[str]] = kwargs.pop("tests_per_model", {})
self.test_results_per_model: dict[str, list[str]] = {}
use_run_command = kwargs.pop("use_run_command", False)
kwargs.setdefault("priority_weight", PRODUCER_WATCHER_DEFAULT_PRIORITY_WEIGHT)
kwargs.setdefault("weight_rule", WATCHER_TASK_WEIGHT_RULE)
# Consumer watcher retry logic handles model-level reruns using the LOCAL execution mode; rerunning the producer
Expand All @@ -110,6 +111,10 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs["queue"] = watcher_dbt_execution_queue or kwargs.get("queue") or DEFAULT_QUEUE
super().__init__(task_id=task_id, *args, **kwargs)

if use_run_command:
# Override the base command to use the run command
self.base_cmd = ["run"]

Comment on lines +114 to +117

Copilot AI Mar 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When use_run_command is enabled, the operator no longer runs a dbt build, but several user-facing messages/docs in this class still refer to "dbt build" (e.g., class docstring and retry-skip log messages). Update the wording to reflect the actual subcommand being executed (or derive it from self.base_cmd) so logs/docs stay accurate when this flag is used.

Copilot uses AI. Check for mistakes.
if self.invocation_mode == InvocationMode.SUBPROCESS:
self.log_format = "json"

Expand Down
4 changes: 4 additions & 0 deletions cosmos/operators/watcher_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class DbtProducerWatcherKubernetesOperator(DbtBuildKubernetesOperator):

def __init__(self, *args: Any, **kwargs: Any) -> None:
task_id = kwargs.pop("task_id", "dbt_producer_watcher_operator")
use_run_command = kwargs.pop("use_run_command", False)

# Disable retries on producer task
default_args = dict(kwargs.get("default_args", {}) or {})
Expand All @@ -99,6 +100,9 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(task_id=task_id, *args, **kwargs)
self.dbt_cmd_flags += ["--log-format", "json"]

if use_run_command:
self.base_cmd = ["run"]

Comment on lines +103 to +105

Copilot AI Mar 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the local watcher producer: if use_run_command switches the producer to dbt run, some log messages in execute() still say "dbt build". Consider updating wording to reflect the actual subcommand (possibly by using self.base_cmd) so operational logs remain accurate when this flag is enabled.

Copilot uses AI. Check for mistakes.
@cached_property
def pod_manager(self) -> CosmosKubernetesPodManager:
return CosmosKubernetesPodManager(kube_client=self.client, callbacks=self.callbacks)
Expand Down
8 changes: 4 additions & 4 deletions tests/operators/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1868,15 +1868,15 @@ def test_dbt_task_group_with_watcher_has_correct_dbt_cmd():
)

producer_operator = dag_dbt_task_group_watcher_flags.task_dict["dbt_task_group.dbt_producer_watcher"]
assert producer_operator.base_cmd == ["build"]
assert producer_operator.base_cmd == ["run"]

Comment on lines 1870 to 1872

Copilot AI Mar 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test now expects the watcher producer to use base_cmd == ['run'], but in _add_watcher_producer_task the use_run_command flag is only set when RenderConfig.selector is truthy. This test config sets test_behavior=NONE but does not set a selector, so the producer will still default to ['build'] and these assertions should fail. Either set selector in the test to cover the selector-specific behavior, or keep asserting build for the non-selector case.

Copilot uses AI. Check for mistakes.
cmd_flags = producer_operator.add_cmd_flags()

# Build the command without executing it
full_cmd, env = producer_operator.build_cmd(context=context, cmd_flags=cmd_flags)

# Verify the command was built correctly
assert full_cmd[1] == "build" # dbt build command
assert full_cmd[1] == "run" # dbt run command (not build) when test_behavior is NONE
assert "--full-refresh" in full_cmd


Expand Down Expand Up @@ -1923,12 +1923,12 @@ def test_dbt_task_group_with_watcher_has_correct_templated_dbt_cmd():
# Basic check for producer task
producer_operator = dag_dbt_task_group_watcher_flags.task_dict["dbt_task_group.dbt_producer_watcher"]
producer_operator.render_template_fields(context=context) # Render the templated fields
assert producer_operator.base_cmd == ["build"]
assert producer_operator.base_cmd == ["run"]

# Build the command without executing it and verify it was built correctly
cmd_flags = producer_operator.add_cmd_flags()
full_cmd, _ = producer_operator.build_cmd(context=context, cmd_flags=cmd_flags)
assert full_cmd[1] == "build" # dbt build command
assert full_cmd[1] == "run" # dbt run command (not build) when test_behavior is NONE
Comment on lines 1923 to +1931

Copilot AI Mar 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as the previous watcher-cmd test: this test expects the producer command to be run, but use_run_command is only enabled when RenderConfig.selector is set. With RenderConfig(test_behavior=NONE) and no selector, the producer should still be using build (with excludes), so these assertions are inconsistent with the current graph logic.

Copilot uses AI. Check for mistakes.

cmd = " ".join(full_cmd)
assert "--full-refresh" in full_cmd
Expand Down
Loading