diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index a8798bf5d2..7df2dc142e 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -27,7 +27,7 @@ x-airflow-common: POSTGRES_PASSWORD: pg_password POSTGRES_DB: airflow POSTGRES_SCHEMA: public - AIRFLOW__COSMOS__WATCHER_RETRY_QUEUE: watcher_retry_queue + AIRFLOW__COSMOS__WATCHER_DBT_EXECUTION_QUEUE: watcher_retry_queue AIRFLOW__LOGGING__LOGGING_LEVEL: DEBUG volumes: diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 1ed4c7bb56..be133264a0 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -280,6 +280,20 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``0.5`` - Environment Variable: ``AIRFLOW__COSMOS__DEBUG_MEMORY_POLL_INTERVAL_SECONDS`` +.. _watcher_dbt_execution_queue: + +`watcher_dbt_execution_queue`_: + (Introduced in Cosmos 1.14.0) When using watcher execution mode, tasks may need to run dbt or not, depending on their type (producer vs. consumer) and the retry number. When running the dbt command, tasks use more resources (CPU and memory) than when behaving as sensors. The computational cost of running these tasks can vary widely. For example, a Cosmos watcher sensor consumes approximately 200MB, compared to 700MB consumed by a dbt build task running a project with almost 200 dbt models. This configuration allows users to define which queue to use when dbt commands are run, optimising their Airflow deployment. Internally, Cosmos leverages the [Airflow cluster policy feature](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/cluster-policies.html). As of now, this configuration will be used: + - for watcher producer tasks, during their first execution + - for watcher consumer tasks, from their first retry onwards + - it will automatically be assigned to the specified queue. + + This behavior is enforced by Cosmos via an Airflow policy (``task_instance_mutation_hook``) that mutates ``task_instance.queue`` at runtime for retry attempts. + As a result, the configured ``watcher_dbt_execution_queue`` can overwrite any queue set directly on the operator, but only for retries; the initial run continues to use the operator's original queue. + + - Default: ``None`` + - Environment Variable: ``AIRFLOW__COSMOS__WATCHER_DBT_EXECUTION_QUEUE`` + [openlineage] ~~~~~~~~~~~~~ diff --git a/docs/getting_started/watcher-execution-mode.rst b/docs/getting_started/watcher-execution-mode.rst index 5c5ca11f9b..5e1ad6d4e8 100644 --- a/docs/getting_started/watcher-execution-mode.rst +++ b/docs/getting_started/watcher-execution-mode.rst @@ -246,6 +246,39 @@ This behavior is designed to support TaskGroup-level retries, as reported in `#2 The overall retry behavior will be further improved once `#1978 `_ is implemented. +**Watcher retry queue** + +.. versionadded:: 1.14.0 + +In watcher execution mode, by default, consumer sensor tasks are lightweight sensors that wait for the producer task to complete. On their first attempt, they require minimal CPU and memory resources. However, when these tasks retry, they execute the dbt command for the node, which may require significantly more resources. + +The ``watcher_dbt_execution_queue`` configuration allows you to specify a different worker queue for retry attempts. This enables you to: + +- **Optimize resource allocation** — Use lightweight workers for initial sensor execution and high-resource workers for retries +- **Improve scheduling efficiency** — Prevent resource contention between initial sensor tasks and retry executions +- **Scale independently** — Scale retry queues separately based on retry workload patterns + +**Configuration:** + +Set the ``watcher_dbt_execution_queue`` in your Airflow configuration: + +.. code-block:: ini + + [cosmos] + watcher_dbt_execution_queue = high_memory_queue + +Or via environment variable: + +.. code-block:: bash + + export AIRFLOW__COSMOS__WATCHER_DBT_EXECUTION_QUEUE=high_memory_queue + +**How it works:** + +- For watcher producer tasks (``DbtProducerWatcherOperator``), the configured queue is used during their first execution +- For watcher consumer tasks (``DbtConsumerWatcherSensor``), from their first retry onwards, if ``watcher_dbt_execution_queue`` is configured, the task is automatically assigned to the specified queue +- This behavior is enforced by Cosmos via an `Airflow cluster policy `_ (``task_instance_mutation_hook``) that mutates ``task_instance.queue`` at runtime for retry attempts + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Installation of Airflow and dbt ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~