From 503e398c1523332936683cc035173ccbaf8d47eb Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 17 Feb 2026 16:42:25 +0530 Subject: [PATCH 1/2] Set correct priority for queue in WatcherMode producer task --- cosmos/operators/watcher.py | 2 +- tests/operators/test_watcher.py | 39 +++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 0fc78aa208..6f6540489c 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -103,7 +103,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: default_args["retries"] = 0 kwargs["default_args"] = default_args kwargs["retries"] = 0 - kwargs["queue"] = watcher_dbt_execution_queue or DEFAULT_QUEUE + kwargs["queue"] = kwargs.get("queue") or watcher_dbt_execution_queue or DEFAULT_QUEUE super().__init__(task_id=task_id, *args, **kwargs) if self.invocation_mode == InvocationMode.SUBPROCESS: diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 23a71e32fa..81c01bf5c4 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -129,6 +129,45 @@ def test_dbt_producer_watcher_operator_queue(queue_override, expected_queue): assert op.queue == expected_queue +def test_producer_queue_from_setup_operator_args_when_both_set(): + """ + When both setup_operator_args queue and watcher_dbt_execution_queue are set, + producer should use queue from setup_operator_args. + """ + with patch("cosmos.operators.watcher.watcher_dbt_execution_queue", "watcher_queue"): + watcher_dag = DbtDag( + project_config=project_config, + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="watcher_dag_setup_queue", + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.WATCHER, + setup_operator_args={"queue": "dbt_producer_task_queue"}, + ), + render_config=RenderConfig(emit_datasets=False), + ) + producer = watcher_dag.task_dict["dbt_producer_watcher"] + assert producer.queue == "dbt_producer_task_queue" + + +def test_producer_queue_from_watcher_dbt_execution_queue_when_only_watcher_set(): + """ + When only watcher_dbt_execution_queue is set (no queue in setup_operator_args), + producer should use queue from watcher_dbt_execution_queue. + """ + with patch("cosmos.operators.watcher.watcher_dbt_execution_queue", "watcher_only_queue"): + watcher_dag = DbtDag( + project_config=project_config, + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="watcher_dag_watcher_queue_only", + execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER), + render_config=RenderConfig(emit_datasets=False), + ) + producer = watcher_dag.task_dict["dbt_producer_watcher"] + assert producer.queue == "watcher_only_queue" + + def test_dbt_producer_watcher_operator_priority_weight_override(): """Test that DbtProducerWatcherOperator allows overriding priority_weight.""" op = DbtProducerWatcherOperator(project_dir=".", profile_config=None, priority_weight=100) From e47419c3c393f05f12633bade2cf5df15a554bd9 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 17 Feb 2026 18:51:01 +0530 Subject: [PATCH 2/2] Fix tests --- tests/operators/test_watcher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 81c01bf5c4..cbe4addfa6 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -129,6 +129,7 @@ def test_dbt_producer_watcher_operator_queue(queue_override, expected_queue): assert op.queue == expected_queue +@pytest.mark.integration def test_producer_queue_from_setup_operator_args_when_both_set(): """ When both setup_operator_args queue and watcher_dbt_execution_queue are set, @@ -150,6 +151,7 @@ def test_producer_queue_from_setup_operator_args_when_both_set(): assert producer.queue == "dbt_producer_task_queue" +@pytest.mark.integration def test_producer_queue_from_watcher_dbt_execution_queue_when_only_watcher_set(): """ When only watcher_dbt_execution_queue is set (no queue in setup_operator_args),