Skip to content
15 changes: 15 additions & 0 deletions dev/dags/example_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,18 @@
pre_dbt >> first_dbt_task_group
# [END example_watcher_taskgroup]
"""

# [START example_watcher_synchronous]
example_watcher_synchronous = DbtDag(
# dbt/cosmos-specific parameters
execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER),
Comment thread
pankajastro marked this conversation as resolved.
project_config=ProjectConfig(DBT_PROJECT_PATH),
profile_config=profile_config,
operator_args={**operator_args, "deferrable": False},
# normal dag parameters
schedule="@daily",
start_date=datetime(2025, 1, 1),
catchup=False,
dag_id="example_watcher_synchronous",
)
# [END example_watcher_synchronous]
20 changes: 18 additions & 2 deletions docs/getting_started/watcher-execution-mode.rst
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,25 @@ Synchronous sensor execution

In Cosmos 1.11.0, the ``DbtConsumerWatcherSensor`` operator is implemented as a synchronous XCom sensor, which continuously occupies the worker slot - even if they're just sleeping and checking periodically.

An improvement is to change this behaviour and implement an asynchronous sensor execution, so that the worker slot is released until the condition, validated by the Airflow triggerer, is met.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Asynchronous sensor execution
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The ticket to implement this behaviour is `#2059 <https://github.com/astronomer/astronomer-cosmos/issues/2059>`_.
Starting with Cosmos 1.12.0, the ``DbtConsumerWatcherSensor`` supports
`deferrable (asynchronous) execution <https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html>`_. Deferrable execution frees up the Airflow worker slot, while task status monitoring is handled by the Airflow triggerer component,
which increases overall task throughput. By default, the sensor now runs in deferrable mode.

**Limitations:**

- Deferrable execution is currently supported only for dbt models, seeds and snapshots.
- Deferrable execution applies only to the first task attempt (try number 1). For subsequent retries, the sensor falls back to synchronous execution.

To disable asynchronous execution, set the ``deferrable`` flag to ``False`` in the ``operator_args``.

.. literalinclude:: ../../dev/dags/example_watcher.py
:language: python
:start-after: [START example_watcher_synchronous]
:end-before: [END example_watcher_synchronous]

~~~~~~~~~~~~~~~~~~~~~~~~~~~
Airflow Datasets and Assets
Expand Down