override the build command and use run when watcher execution test behavior is set to NONE or AFTER_ALL#2428
Conversation
8bb923b to
db311d3
Compare
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent the WATCHER-mode producer task from executing dbt tests by overriding the producer’s dbt subcommand from build to run in specific cases (notably when a selector is used).
Changes:
- Add a
use_run_commandkwarg to WATCHER producer operators (local + Kubernetes) to overridebase_cmdto["run"]. - Update WATCHER graph construction to set
use_run_commandwhentest_behaviorisNONE/AFTER_ALLand a selector is provided; otherwise keep excluding test resources. - Update integration tests to expect
dbt runfor the WATCHER producer command.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
cosmos/airflow/graph.py |
Conditionally switches WATCHER producer from build to run when a selector is set (to avoid tests). |
cosmos/operators/watcher.py |
Adds use_run_command override for the local WATCHER producer operator. |
cosmos/operators/watcher_kubernetes.py |
Adds use_run_command override for the Kubernetes WATCHER producer operator. |
tests/operators/test_watcher.py |
Updates assertions to expect run for WATCHER producer command construction. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Use `dbt run` instead of `dbt build` so tests are not executed by the | ||
| # producer (dbt ignores --exclude with --selector). | ||
| if render_config.selector: | ||
| producer_task_args["use_run_command"] = True |
There was a problem hiding this comment.
Switching the watcher producer from dbt build to dbt run when a selector is set will skip non-model resources (e.g., seeds/snapshots) because dbt run only executes models. In WATCHER modes, consumer sensors for seeds/snapshots assume the producer executed those nodes and pushed statuses to XCom, so this can cause sensors to wait/timeout. Consider guarding use_run_command behind a check that the selected nodes are models-only (or raise a clear config error), or use a build-based approach that can still exclude tests with selectors.
| producer_task_args["use_run_command"] = True | |
| # Switching to `dbt run` means only models will be executed. In WATCHER | |
| # mode, non-model resources (e.g. seeds/snapshots) must still be | |
| # executed by the producer so their statuses are pushed for sensors. | |
| # Only enable `use_run_command` when we can verify that the selected | |
| # resources are models-only; otherwise raise a clear config error or | |
| # fall back to the default `dbt build` behavior. | |
| resource_types = getattr(render_config, "resource_types", None) | |
| if resource_types is None: | |
| # We cannot verify resource types on this RenderConfig. To avoid | |
| # silently skipping non-model resources, keep using `dbt build` | |
| # instead of switching to `dbt run`. | |
| logger.warning( | |
| "Watcher producer with selector cannot verify resource types; " | |
| "falling back to `dbt build` to ensure non-model resources " | |
| "are executed." | |
| ) | |
| else: | |
| non_model_resources = [ | |
| rt for rt in resource_types if rt is not DbtResourceType.MODEL | |
| ] | |
| if non_model_resources: | |
| raise CosmosValueError( | |
| "Invalid configuration for watcher producer: using a " | |
| "dbt selector with non-model resources while tests are " | |
| "disabled/deferred would require switching to `dbt run`, " | |
| "which would skip non-model resources and cause watcher " | |
| "sensors to wait or time out. Restrict the selector to " | |
| "models-only or adjust test behavior." | |
| ) | |
| producer_task_args["use_run_command"] = True |
| producer_operator = dag_dbt_task_group_watcher_flags.task_dict["dbt_task_group.dbt_producer_watcher"] | ||
| assert producer_operator.base_cmd == ["build"] | ||
| assert producer_operator.base_cmd == ["run"] | ||
|
|
There was a problem hiding this comment.
This test now expects the watcher producer to use base_cmd == ['run'], but in _add_watcher_producer_task the use_run_command flag is only set when RenderConfig.selector is truthy. This test config sets test_behavior=NONE but does not set a selector, so the producer will still default to ['build'] and these assertions should fail. Either set selector in the test to cover the selector-specific behavior, or keep asserting build for the non-selector case.
| # Basic check for producer task | ||
| producer_operator = dag_dbt_task_group_watcher_flags.task_dict["dbt_task_group.dbt_producer_watcher"] | ||
| producer_operator.render_template_fields(context=context) # Render the templated fields | ||
| assert producer_operator.base_cmd == ["build"] | ||
| assert producer_operator.base_cmd == ["run"] | ||
|
|
||
| # Build the command without executing it and verify it was built correctly | ||
| cmd_flags = producer_operator.add_cmd_flags() | ||
| full_cmd, _ = producer_operator.build_cmd(context=context, cmd_flags=cmd_flags) | ||
| assert full_cmd[1] == "build" # dbt build command | ||
| assert full_cmd[1] == "run" # dbt run command (not build) when test_behavior is NONE |
There was a problem hiding this comment.
Same issue as the previous watcher-cmd test: this test expects the producer command to be run, but use_run_command is only enabled when RenderConfig.selector is set. With RenderConfig(test_behavior=NONE) and no selector, the producer should still be using build (with excludes), so these assertions are inconsistent with the current graph logic.
| if use_run_command: | ||
| # Override the base command to use the run command | ||
| self.base_cmd = ["run"] | ||
|
|
There was a problem hiding this comment.
When use_run_command is enabled, the operator no longer runs a dbt build, but several user-facing messages/docs in this class still refer to "dbt build" (e.g., class docstring and retry-skip log messages). Update the wording to reflect the actual subcommand being executed (or derive it from self.base_cmd) so logs/docs stay accurate when this flag is used.
| if use_run_command: | ||
| self.base_cmd = ["run"] | ||
|
|
There was a problem hiding this comment.
Same as the local watcher producer: if use_run_command switches the producer to dbt run, some log messages in execute() still say "dbt build". Consider updating wording to reflect the actual subcommand (possibly by using self.base_cmd) so operational logs remain accurate when this flag is enabled.
tatiana
left a comment
There was a problem hiding this comment.
@anissarashid Thanks a lot for reporting this issue and proposing this fix!
To change to run in those cases feels like the right approach.
My only concern right now is that dbt build, in addition to dbt run and dbt tests, also ends up running:
How do you think we could tackle this?
|
Could I suggest looking at the https://docs.getdbt.com/reference/global-configs/resource-type?version=1.11 flag in the build command instead of switching to dbt run.
I tried running a dbt build with select and selectors and it ran all expected nodes excluding tests. DBT 1.8 also included a exclude-resource-type flag. @tatiana I'm not sure what the lower bound of DBT support is in cosmos. exclude-resource-type would be easier to maintain if its greater than 1.8. |
|
Thank you, @anissarashid, for reporting the issue and offering to contribute this fix. Thank you @johnhoran for bringing to attention the exclude-resource-type yet.
As we would like to fix the issue #2415 on priority and ship the fix in our upcoming release 1.14.0, this issue is assigned to me on priority. I have followed the approach to use For your contributions here, with your permissions, I would also like to add you both as co-authors in the new PR. I have tried to get your emails from the public GitHub API, but please let me know if you'd like me to correct those that are currently reflected in the new PR's description. Thanks a lot again for your contributions! 🚀 |
Description
Watcher execution mode uses
dbt buildas its base operator, and uses--excludeflags when the test behavior is set toNONEorAFTER_ALLto exclude tests from the producer task. When we useselector, theexcludeflags are ignored by dbt, so tests will be run in unintentionally in the producer task.This updates the watcher operator to conditionally use the
runcommand, and override thebuild_cmdif set to true.Related Issue(s)
Closes: #2415
Breaking Change?
Checklist