Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4568d4e
Update watcher execution mode docs
tatiana Feb 18, 2026
ceb596f
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Feb 18, 2026
bf44803
Apply suggestion from @tatiana
tatiana Feb 20, 2026
b14b121
Merge branch 'main' into tatiana-patch-1
tatiana Feb 20, 2026
bb6c17a
Update docs/getting_started/watcher-execution-mode.rst
tatiana Feb 20, 2026
d3b175b
Update docs/getting_started/watcher-execution-mode.rst
tatiana Feb 20, 2026
4876098
Update docs/getting_started/watcher-execution-mode.rst
tatiana Feb 20, 2026
d745230
Update docs/getting_started/watcher-execution-mode.rst
tatiana Feb 20, 2026
27668fc
Update docs/getting_started/watcher-execution-mode.rst
tatiana Feb 20, 2026
ea62472
Apply suggestion from @tatiana
tatiana Feb 20, 2026
d0ec73e
Apply suggestion from @tatiana
tatiana Feb 20, 2026
4a35ad9
Apply suggestion from @Copilot
tatiana Feb 20, 2026
f95d585
Apply suggestion from @Copilot
tatiana Feb 20, 2026
dceead7
Apply suggestion from @Copilot
tatiana Feb 20, 2026
5e7ed83
Apply suggestion from @Copilot
tatiana Feb 20, 2026
c3afcee
Update docs/getting_started/watcher-execution-mode.rst
tatiana Feb 20, 2026
1057baa
Update docs/getting_started/watcher-execution-mode.rst
tatiana Feb 20, 2026
a2428c4
Add docs note for watcher queue precedence
pankajastro Feb 21, 2026
847efcd
Merge branch 'main' into watcher_queue_note
pankajastro Feb 23, 2026
1d4a466
Update docs/getting_started/watcher-execution-mode.rst
pankajastro Feb 23, 2026
0f9d593
Fix precedence
pankajastro Feb 24, 2026
f37261b
Apply review suggestion
pankajastro Feb 24, 2026
69e7049
Merge branch 'main' into watcher_queue_note
pankajastro Feb 24, 2026
c21505e
Adjust tests
pankajastro Feb 24, 2026
55f2704
Fix note
pankajastro Feb 24, 2026
921d11d
Merge branch 'main' into watcher_queue_note
pankajastro Feb 24, 2026
f97813b
Fix tests
pankajastro Feb 24, 2026
b9ceb0c
Apply suggestion from @tatiana
tatiana Feb 25, 2026
495fb87
Merge branch 'main' into watcher_queue_note
tatiana Feb 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cosmos/operators/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
default_args["retries"] = 0
kwargs["default_args"] = default_args
kwargs["retries"] = 0
kwargs["queue"] = kwargs.get("queue") or watcher_dbt_execution_queue or DEFAULT_QUEUE
kwargs["queue"] = watcher_dbt_execution_queue or kwargs.get("queue") or DEFAULT_QUEUE
Comment thread
pankajastro marked this conversation as resolved.
super().__init__(task_id=task_id, *args, **kwargs)

if self.invocation_mode == InvocationMode.SUBPROCESS:
Expand Down
8 changes: 8 additions & 0 deletions docs/getting_started/watcher-execution-mode.rst
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ Or via environment variable:
- For watcher consumer tasks (``DbtConsumerWatcherSensor``), from their first retry onwards, if ``watcher_dbt_execution_queue`` is configured, the task is automatically assigned to the specified queue
- This behavior is enforced by Cosmos via an `Airflow cluster policy <https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/cluster-policies.html>`_ (``task_instance_mutation_hook``) that mutates ``task_instance.queue`` at runtime for retry attempts

.. note::

For producer task execution, we encourage users to set the ``watcher_dbt_execution_queue`` configuration. If, for any reason, users prefer to use a different node pool for producer tasks without setting an Airflow Cluster Policy, they can set the ``queue`` argument via ``setup_operator_args``. This, however, would not solve the problem of assigning consumer retries to nodes that may have more memory and CPU available.

The effective precedence is:

``watcher_dbt_execution_queue`` > explicit ``queue`` on the producer (from ``setup_operator_args``) > ``operator_args`` > your Airflow deployment’s default queue.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Installation of Airflow and dbt
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
19 changes: 11 additions & 8 deletions tests/operators/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def test_dbt_producer_watcher_operator_queue(queue_override, expected_queue):
def test_producer_queue_from_setup_operator_args_when_both_set():

Copilot AI Feb 25, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test name test_producer_queue_from_setup_operator_args_when_both_set no longer matches its behavior/assertion: it now verifies that watcher_dbt_execution_queue takes precedence when both are set. Renaming the test (and optionally aligning the docstring’s first line) would prevent confusion and make failures easier to interpret.

Suggested change
def test_producer_queue_from_setup_operator_args_when_both_set():
def test_producer_queue_prefers_watcher_queue_when_both_set():

Copilot uses AI. Check for mistakes.
"""
When both setup_operator_args queue and watcher_dbt_execution_queue are set,
producer should use queue from setup_operator_args.
producer should use queue from watcher_dbt_execution_queue.
"""
with patch("cosmos.operators.watcher.watcher_dbt_execution_queue", "watcher_queue"):
watcher_dag = DbtDag(
Expand All @@ -148,26 +148,29 @@ def test_producer_queue_from_setup_operator_args_when_both_set():
render_config=RenderConfig(emit_datasets=False),
)
producer = watcher_dag.task_dict["dbt_producer_watcher"]
assert producer.queue == "dbt_producer_task_queue"
assert producer.queue == "watcher_queue"


@pytest.mark.integration
def test_producer_queue_from_watcher_dbt_execution_queue_when_only_watcher_set():
def test_producer_queue_from_setup_operator_args():
"""
When only watcher_dbt_execution_queue is set (no queue in setup_operator_args),
producer should use queue from watcher_dbt_execution_queue.
When only setup_operator_args is set (no queue in watcher_dbt_execution_queue),
producer should use queue from setup_operator_args.
"""
with patch("cosmos.operators.watcher.watcher_dbt_execution_queue", "watcher_only_queue"):
with patch("cosmos.operators.watcher.watcher_dbt_execution_queue", None):
watcher_dag = DbtDag(
project_config=project_config,
profile_config=profile_config,
start_date=datetime(2023, 1, 1),
dag_id="watcher_dag_watcher_queue_only",
execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER),
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.WATCHER,
setup_operator_args={"queue": "dbt_producer_task_queue"},
),
render_config=RenderConfig(emit_datasets=False),
)
producer = watcher_dag.task_dict["dbt_producer_watcher"]
assert producer.queue == "watcher_only_queue"
assert producer.queue == "dbt_producer_task_queue"
Comment thread
pankajastro marked this conversation as resolved.
Comment thread
pankajastro marked this conversation as resolved.


def test_dbt_producer_watcher_operator_priority_weight_override():
Expand Down