[Experimental] Add source freshness aware execution for ExecutionMode.WATCHER#2467
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2467 +/- ##
==========================================
- Coverage 98.03% 97.92% -0.12%
==========================================
Files 103 103
Lines 7173 7312 +139
==========================================
+ Hits 7032 7160 +128
- Misses 141 152 +11 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
@pankajastro I think this is a solid step forward in supporting source nodes with the ExecutionMode.WATCHER - thanks a lot for the work on this!
I’ve left a few inline comments regarding the implementation.
While I do think we should continue iterating on this and move it toward a release as part of 1.14, it doesn’t fully address the original issue that prompted feature request #2053. In particular, the current solution doesn’t account for overriding Cosmos source nodes (as described here: https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html#customizing-how-nodes-are-rendered-experimental).
Supporting this was a key requirement when the issue was raised by the customer - they’re using custom SQL-based table state checks, effectively making Cosmos stateful, similar to dbt’s state-aware workflows (https://docs.getdbt.com/docs/deploy/state-aware-about).
So we change the PR description to clarify that it does not close the custom source node behaviour, as declared in #2053.
One last comment: how do we want this feature to relate to the existing source behaviour (https://astronomer.github.io/astronomer-cosmos/guides/translate_dbt_to_airflow/managing-sources.html)?
Thanks, @tatiana, for the review feedback. I’ve updated the PR description to include the current limitations and have addressed your earlier comments. I also revised the description to ensure that issue #2053 does not get closed. If the source behavior is not None, we will run freshness checks and take action based on the results. This PR does not yet expose a callable to override the behavior. However, it lays the groundwork for introducing a callable in the future and allowing customization of the default behavior. |
There was a problem hiding this comment.
Pull request overview
Adds source-freshness-aware behavior to ExecutionMode.WATCHER so the producer can run dbt source freshness, detect stale sources, and proactively skip/exclude downstream resources to avoid waiting sensors and unnecessary dbt execution.
Changes:
- Implement freshness run + downstream traversal to mark dependent models as
"skipped"and append them todbt build --exclude. - Teach watcher consumers/triggerer/state helpers to treat
"skipped"as a terminal status and raiseAirflowSkipException. - Add unit tests covering freshness callback behavior, producer freshness flow, skipped handling, and graph flag wiring.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| cosmos/operators/watcher.py | Adds freshness callback + producer logic to run dbt source freshness, push synthetic skipped XComs, and update --exclude. |
| cosmos/operators/local.py | Adds target/sources.json reader and a run_command fast-path to capture sources.json for freshness runs. |
| cosmos/operators/_watcher/state.py | Introduces skipped status helpers and includes skipped in “terminal” status logic. |
| cosmos/operators/_watcher/base.py | Raises AirflowSkipException for skipped statuses in both deferrable and non-deferrable sensor paths. |
| cosmos/operators/_watcher/triggerer.py | Emits a terminal trigger event when node status is skipped. |
| cosmos/airflow/graph.py | Wires _check_source_freshness flag onto watcher producer when sources are rendered. |
| tests/operators/test_watcher.py | Adds unit tests for default freshness traversal + producer skip/exclude behavior. |
| tests/operators/test_local.py | Adds tests for sources.json parsing and the run_command early-return freshness path. |
| tests/operators/_watcher/test_watcher_base.py | Adds tests ensuring skipped status raises AirflowSkipException. |
| tests/operators/_watcher/test_triggerer.py | Extends triggerer tests to cover skipped status branching. |
| tests/operators/_watcher/test_state.py | Adds unit tests for skipped/terminal status helpers. |
| tests/airflow/test_graph.py | Adds tests asserting _check_source_freshness is set based on SourceRenderingBehavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
tatiana
left a comment
There was a problem hiding this comment.
@pankajastro Thanks a lot for addressing all the feedback! I believe we're really close to getting this merged. I left some feedback inline. Please let me know if you'd like to have a last review over a call, so we can merge this sooner rather than later
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Without this, skip/failure messages would say "Model" instead of "Source" for source freshness sensors. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The base _fallback_to_non_watcher_run runs dbt run, which is wrong for source sensors. Override to run dbt source freshness with the correct source selector syntax on retry. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
INFO on every poll iteration is too noisy with many sensors. Revert to DEBUG; terminal state transitions still log at INFO/WARNING level. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The watcher source rendering DAG runs dbt source freshness before dbt build, so the source tables must already exist. Add a dedicated test that runs source_rendering_dag (LOCAL mode) first to load seeds, then watcher_source_rendering_dag. Skip multibyte dataset tests (model removed from altered_jaffle_shop). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
tests/test_example_dags.py:28
source_rendering_dagis still executed by the parameterizedtest_example_dag, and then executed again (and order-dependent) intest_watcher_source_rendering_dag. This can make the suite slower and can introduce ordering-dependent/flaky behavior. Consider addingsource_rendering_dagtoDAGS_WITH_SEED_DEPENDENCY(or otherwise excluding it from the parameterized test) so both DAGs only run via the dedicated ordered test.
# DAGs that require seeds to be loaded first (run via dedicated ordered tests below)
DAGS_WITH_SEED_DEPENDENCY = ["watcher_source_rendering_dag"]
IGNORED_DAG_FILES = [
"performance_dag.py",
"jaffle_shop_kubernetes.py",
"jaffle_shop_watcher_kubernetes.py",
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
dbt source freshness does not support --resource-type flags that are added to dbt_cmd_flags when TestBehavior is NONE. Save and clear dbt_cmd_flags before running source freshness, restore them afterward. --select/--exclude are unaffected as they come from add_global_flags via separate attributes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Remove the default=True from the helper so tests that call _make_producer() without arguments validate the operator's real default (False). Add explicit test for the default value. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
tatiana
left a comment
There was a problem hiding this comment.
I've picked up this work from @pankajastro, rebased and made a few adjustments, which include:
- source nodes were running the source freshness, even though it had been run by the producer
- added a warning to the successful status
- fixed issues when sources had tests after rebase: #2520
I logged a ticket with some of the improvements we agreed to do after:
#2523
I also re-enabled the integration test (#1802) and temporarily removed the non-ASCII model (#2524). We'll address that in a follow up PR, since this one grew a lot.
Add source freshness awareness for
ExecutionMode.WATCHERRelated issue: #2053
What this PR does
When
SourceRenderingBehavioris notNONE, theDbtProducerWatcherOperatornow runsdbt source freshnessbefore the maindbt build. For every source that is stale (erroror
warn), all downstream models that transitively depend on that source are:"skipped"XCom so their consumer sensors raiseAirflowSkipExceptionimmediately
--excludeselector of the subsequentdbt buildso dbt does not attempt to runthem
The dependency traversal (BFS from stale source → transitive model dependents) runs at task
execution time using nodes from
dag.dbt_graph.nodes. Test nodes are intentionally excludedfrom the skip/exclude logic — dbt skips them automatically when their parent model is excluded.
dbt source freshnessis run via the existingrun_commandinfrastructure (temp directory,profile handling, subprocess vs dbt runner routing) rather than a raw subprocess call.
Known limitations
Warning
This PR does not fully close #2053. See limitations below.
"skip"is the only supported action — when a stale source is detected, downstream modelsare always skipped. Configurable behavior (e.g. warn-only or custom handling) is not supported
in this release.
_freshness_callbackis internal — not part of the public API and not exposed for usercustomisation. A future release may promote this to a stable, user-facing interface.
overriding Cosmos source nodes with custom SQL-based table state checks (similar to dbt's
state-aware runs). This will be
tracked separately.
--selector— the exclude logic appends toself.excludeusing modelnames and does not interact correctly with dbt's
--selectorargument (see [Bug]TestBehavior.NONEdoes not work with selectors in Watcher Exec Mode #2415).LoadMode.DBT_MANIFESTfreshness data is ignored —dbt source freshnessis alwaysre-run at task execution time; any freshness metadata in a pre-compiled manifest is not used.
Screen.Recording.2026-03-20.at.1.42.34.PM.mov
Co-authored-by: Claude noreply@anthropic.com