Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 61 additions & 5 deletions docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,10 @@ Source freshness nodes

Since Cosmos 1.6, it `supports the rendering of source nodes <https://www.astronomer.io/blog/native-support-for-source-node-rendering-in-cosmos/>`_.

We noticed some Cosmos users use this feature alongside `overriding Cosmos source nodes <https://astronomer.github.io/astronomer-cosmos/guides/render-config.html#customizing-how-nodes-are-rendered-experimental>`_ as sensors or another operator that allows them to skip the following branch of the DAG if the source is not fresh.

This use case is not currently supported by the ``ExecutionMode.WATCHER``, since the ``dbt build`` command does not run `source freshness checks <https://docs.getdbt.com/reference/commands/build#source-freshness-checks>`_.

We have a follow-up ticket to `further investigate this use case <https://github.com/astronomer/astronomer-cosmos/issues/2053>`_.
Starting with Cosmos 1.14.0, ``ExecutionMode.WATCHER`` supports source freshness aware execution. When
``source_rendering_behavior`` is not ``NONE``, the producer task automatically runs ``dbt source freshness``
before ``dbt build``, and the freshness callback determines which dependent nodes are skipped based on
stale sources. See :ref:`watcher-source-freshness` for details.
Comment on lines +420 to +423

Concurrent DAG runs with ``depends_on_past``
''''''''''''''''''''''''''''''''''''''''''''
Expand All @@ -443,6 +442,63 @@ The ``DbtProducerWatcherOperator`` and ``DbtConsumerWatcherSensor`` will use the

You can define different ``callback`` behaviors for producer and consumer nodes by using ``operator_args`` to configure the consumer callback and ``setup_operator_args`` to override the callback for the producer, as described below.

.. _watcher-source-freshness:

Source freshness aware execution (Experimental)
''''''''''''''''''''''''''''''''''''''''''''''''

.. versionadded:: 1.14.0

.. warning::

This feature is **experimental** and may change without a deprecation period.

When ``source_rendering_behavior`` is set to ``ALL`` or ``WITH_TESTS_OR_FRESHNESS`` in ``RenderConfig``,
the producer automatically runs ``dbt source freshness`` before ``dbt build`` and always invokes the
freshness callback afterward. The callback inspects the freshness results (``sources_json``) and returns
a list of ``(unique_id, state)`` tuples for any nodes that should be pre-marked; it may return an empty
list when no nodes need special handling. Each returned node receives a pre-populated XCom entry; nodes
returned with a non-success state are also added to ``--exclude`` so dbt skips them entirely.

Comment thread
pankajastro marked this conversation as resolved.
The consumer sensor recognises three state families: ``"skipped"`` raises ``AirflowSkipException``,
``"success"`` / ``"pass"`` / ``"warn"`` marks the task as succeeded, and anything else (e.g.
``"failed"``, ``"error"``) raises ``AirflowException``. The default callback always returns
``"skipped"`` for stale dependents — a node is skipped only when **all** of its upstream dependencies
are stale or already skipped.

.. literalinclude:: ../../../../dev/dags/watcher_with_freshness_check.py
:language: python
:start-after: [START example_watcher_with_freshness]
:end-before: [END example_watcher_with_freshness]

To override the default logic, pass a ``freshness_callback`` via ``setup_operator_args``
(custom callback support added in Cosmos 1.15.0):

Comment on lines +474 to +476
.. code-block:: python

def my_freshness_callback(
context: Context,
dag: Any,
task_group: TaskGroup | None,
nodes: dict[str, DbtNode] | None, # full DbtGraph.nodes for dependency traversal
sources_json: dict[str, Any] | None, # parsed target/sources.json
) -> list[tuple[str, str]]: # (unique_id, state) pairs
...


execution_config = ExecutionConfig(
execution_mode=ExecutionMode.WATCHER,
setup_operator_args={"freshness_callback": my_freshness_callback},
)

**Known limitations:**

* Incompatible with ``selector`` in ``RenderConfig`` — ``--exclude`` is ignored by dbt when a YAML selector is active.
* ``dbt source freshness`` is always re-executed at runtime; ``LoadMode.DBT_MANIFEST`` freshness data is not consulted.
* Not supported for ``ExecutionMode.WATCHER_KUBERNETES``.

-------------------------------------------------------------------------------

Overriding ``operator_args``
''''''''''''''''''''''''''''

Expand Down
Loading