diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index d440276af5..8d24756916 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -103,7 +103,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: default_args["retries"] = 0 kwargs["default_args"] = default_args kwargs["retries"] = 0 - kwargs["queue"] = kwargs.get("queue") or watcher_dbt_execution_queue or DEFAULT_QUEUE + kwargs["queue"] = watcher_dbt_execution_queue or kwargs.get("queue") or DEFAULT_QUEUE super().__init__(task_id=task_id, *args, **kwargs) if self.invocation_mode == InvocationMode.SUBPROCESS: diff --git a/docs/getting_started/watcher-execution-mode.rst b/docs/getting_started/watcher-execution-mode.rst index 2c24ae85d0..af7589650c 100644 --- a/docs/getting_started/watcher-execution-mode.rst +++ b/docs/getting_started/watcher-execution-mode.rst @@ -281,6 +281,14 @@ Or via environment variable: - For watcher consumer tasks (``DbtConsumerWatcherSensor``), from their first retry onwards, if ``watcher_dbt_execution_queue`` is configured, the task is automatically assigned to the specified queue - This behavior is enforced by Cosmos via an `Airflow cluster policy `_ (``task_instance_mutation_hook``) that mutates ``task_instance.queue`` at runtime for retry attempts +.. note:: + + For producer task execution, we encourage users to set the ``watcher_dbt_execution_queue`` configuration. If, for any reason, users prefer to use a different node pool for producer tasks without setting an Airflow Cluster Policy, they can set the ``queue`` argument via ``setup_operator_args``. This, however, would not solve the problem of assigning consumer retries to nodes that may have more memory and CPU available. + + The effective precedence is: + + ``watcher_dbt_execution_queue`` > explicit ``queue`` on the producer (from ``setup_operator_args``) > ``operator_args`` > your Airflow deployment’s default queue. + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Installation of Airflow and dbt ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 20b6c6b4a1..ce1c2bedb9 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -133,7 +133,7 @@ def test_dbt_producer_watcher_operator_queue(queue_override, expected_queue): def test_producer_queue_from_setup_operator_args_when_both_set(): """ When both setup_operator_args queue and watcher_dbt_execution_queue are set, - producer should use queue from setup_operator_args. + producer should use queue from watcher_dbt_execution_queue. """ with patch("cosmos.operators.watcher.watcher_dbt_execution_queue", "watcher_queue"): watcher_dag = DbtDag( @@ -148,26 +148,29 @@ def test_producer_queue_from_setup_operator_args_when_both_set(): render_config=RenderConfig(emit_datasets=False), ) producer = watcher_dag.task_dict["dbt_producer_watcher"] - assert producer.queue == "dbt_producer_task_queue" + assert producer.queue == "watcher_queue" @pytest.mark.integration -def test_producer_queue_from_watcher_dbt_execution_queue_when_only_watcher_set(): +def test_producer_queue_from_setup_operator_args(): """ - When only watcher_dbt_execution_queue is set (no queue in setup_operator_args), - producer should use queue from watcher_dbt_execution_queue. + When only setup_operator_args is set (no queue in watcher_dbt_execution_queue), + producer should use queue from setup_operator_args. """ - with patch("cosmos.operators.watcher.watcher_dbt_execution_queue", "watcher_only_queue"): + with patch("cosmos.operators.watcher.watcher_dbt_execution_queue", None): watcher_dag = DbtDag( project_config=project_config, profile_config=profile_config, start_date=datetime(2023, 1, 1), dag_id="watcher_dag_watcher_queue_only", - execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER), + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.WATCHER, + setup_operator_args={"queue": "dbt_producer_task_queue"}, + ), render_config=RenderConfig(emit_datasets=False), ) producer = watcher_dag.task_dict["dbt_producer_watcher"] - assert producer.queue == "watcher_only_queue" + assert producer.queue == "dbt_producer_task_queue" def test_dbt_producer_watcher_operator_priority_weight_override():