Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ x-airflow-common:
POSTGRES_PASSWORD: pg_password
POSTGRES_DB: airflow
POSTGRES_SCHEMA: public
AIRFLOW__COSMOS__WATCHER_RETRY_QUEUE: watcher_retry_queue
AIRFLOW__COSMOS__WATCHER_DBT_EXECUTION_QUEUE: watcher_retry_queue
AIRFLOW__LOGGING__LOGGING_LEVEL: DEBUG

volumes:
Expand Down
14 changes: 14 additions & 0 deletions docs/configuration/cosmos-conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,20 @@ This page lists all available Airflow configurations that affect ``astronomer-co
- Default: ``0.5``
- Environment Variable: ``AIRFLOW__COSMOS__DEBUG_MEMORY_POLL_INTERVAL_SECONDS``

.. _watcher_dbt_execution_queue:

`watcher_dbt_execution_queue`_:
(Introduced in Cosmos 1.14.0) When using watcher execution mode, tasks may need to run dbt or not, depending on their type (producer vs. consumer) and the retry number. When running the dbt command, tasks use more resources (CPU and memory) than when behaving as sensors. The computational cost of running these tasks can vary widely. For example, a Cosmos watcher sensor consumes approximately 200MB, compared to 700MB consumed by a dbt build task running a project with almost 200 dbt models. This configuration allows users to define which queue to use when dbt commands are run, optimising their Airflow deployment. Internally, Cosmos leverages the [Airflow cluster policy feature](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/cluster-policies.html). As of now, this configuration will be used:
- for watcher producer tasks, during their first execution
- for watcher consumer tasks, from their first retry onwards
- it will automatically be assigned to the specified queue.

Comment thread
pankajastro marked this conversation as resolved.
This behavior is enforced by Cosmos via an Airflow policy (``task_instance_mutation_hook``) that mutates ``task_instance.queue`` at runtime for retry attempts.
As a result, the configured ``watcher_dbt_execution_queue`` can overwrite any queue set directly on the operator, but only for retries; the initial run continues to use the operator's original queue.

Comment thread
tatiana marked this conversation as resolved.
- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__WATCHER_DBT_EXECUTION_QUEUE``

[openlineage]
~~~~~~~~~~~~~

Expand Down
33 changes: 33 additions & 0 deletions docs/getting_started/watcher-execution-mode.rst
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,39 @@ This behavior is designed to support TaskGroup-level retries, as reported in `#2

The overall retry behavior will be further improved once `#1978 <https://github.com/astronomer/astronomer-cosmos/issues/1978>`_ is implemented.

**Watcher retry queue**

.. versionadded:: 1.14.0

In watcher execution mode, by default, consumer sensor tasks are lightweight sensors that wait for the producer task to complete. On their first attempt, they require minimal CPU and memory resources. However, when these tasks retry, they execute the dbt command for the node, which may require significantly more resources.

The ``watcher_dbt_execution_queue`` configuration allows you to specify a different worker queue for retry attempts. This enables you to:

- **Optimize resource allocation** — Use lightweight workers for initial sensor execution and high-resource workers for retries
- **Improve scheduling efficiency** — Prevent resource contention between initial sensor tasks and retry executions
- **Scale independently** — Scale retry queues separately based on retry workload patterns

**Configuration:**

Set the ``watcher_dbt_execution_queue`` in your Airflow configuration:

.. code-block:: ini

[cosmos]
watcher_dbt_execution_queue = high_memory_queue

Or via environment variable:

.. code-block:: bash

export AIRFLOW__COSMOS__WATCHER_DBT_EXECUTION_QUEUE=high_memory_queue

**How it works:**

- For watcher producer tasks (``DbtProducerWatcherOperator``), the configured queue is used during their first execution
- For watcher consumer tasks (``DbtConsumerWatcherSensor``), from their first retry onwards, if ``watcher_dbt_execution_queue`` is configured, the task is automatically assigned to the specified queue
- This behavior is enforced by Cosmos via an `Airflow cluster policy <https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/cluster-policies.html>`_ (``task_instance_mutation_hook``) that mutates ``task_instance.queue`` at runtime for retry attempts

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Installation of Airflow and dbt
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
Loading