Skip to content

Fix incorrectly skipped source downstream tasks in the watcher#2563

Merged
pankajastro merged 4 commits into
mainfrom
FIX_2536
Apr 17, 2026
Merged

Fix incorrectly skipped source downstream tasks in the watcher#2563
pankajastro merged 4 commits into
mainfrom
FIX_2536

Conversation

@pankajastro
Copy link
Copy Markdown
Contributor

@pankajastro pankajastro commented Apr 16, 2026

The DFS previously marked every transitive dependent of a stale source as skip, regardless of whether the node had additional clean upstream paths that would allow it to succeed.

For example, a model that depends on both a stale source and a clean upstream model was pre-emptively excluded even though dbt would have run it successfully (especially for warn-status sources where dbt continues execution normally).

A node is now added to the skip set only when all of its depends_on entries are either known-stale sources or already-skipped nodes. Nodes with at least one clean upstream path are left out of the skip set and allowed to run. The DFS still re-evaluates each candidate when a new node joins the skip set, so purely stale chains are handled correctly.

closes: #2536

@pankajastro pankajastro changed the title Fix over-aggressive skip in _default_freshness_callback Fix over-aggressive skip in _default_freshness_callback Apr 16, 2026
@pankajastro pankajastro marked this pull request as ready for review April 16, 2026 07:05
Copilot AI review requested due to automatic review settings April 16, 2026 07:05
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Updates WATCHER source-freshness skipping so downstream nodes are only excluded when all upstream dependencies are stale sources or already-skipped nodes, preventing over-aggressive skipping when a clean upstream path exists.

Changes:

  • Refines _default_freshness_callback DFS logic to skip nodes only when all upstream dependencies are stale/skip-eligible.
  • Expands unit test coverage to validate mixed-upstream graphs and “all-upstreams-stale” behavior.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
cosmos/operators/watcher.py Adjusts freshness callback traversal/skip criteria to avoid skipping nodes that still have a clean upstream path.
tests/operators/test_watcher.py Adds regression tests for mixed stale+clean upstreams and for nodes that should be skipped only when all upstreams are stale/skipped.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/watcher.py
Comment thread cosmos/operators/watcher.py
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 16, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 98.05%. Comparing base (aa4c770) to head (9c8eeff).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2563   +/-   ##
=======================================
  Coverage   98.04%   98.05%           
=======================================
  Files         103      103           
  Lines        7589     7594    +5     
=======================================
+ Hits         7441     7446    +5     
  Misses        148      148           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread cosmos/operators/watcher.py
pankajastro and others added 3 commits April 16, 2026 18:20
The DFS previously marked every transitive dependent of a stale source
as skip, regardless of whether the node had additional clean upstream
paths that would allow it to succeed.

For example, a model that depends on both a stale source and a clean
upstream model was pre-emptively excluded even though dbt would have
run it successfully (especially for warn-status sources where dbt
continues execution normally).

A node is now added to the skip set only when all of its depends_on
entries are either known-stale sources or already-skipped nodes. Nodes
with at least one clean upstream path are left out of the skip set and
allowed to run. The DFS still re-evaluates each candidate when a new
node joins the skip set, so purely stale chains are handled correctly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Renamed inner variable `node` to `dependent_node` to avoid shadowing
the outer loop variable `node: DbtNode` with the wider type
`DbtNode | None` returned by dict.get().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Cover the two early-continue branches in _default_freshness_callback:
- dependent_id already in visited (diamond-graph scenario)
- dependent_node is None from nodes.get (NullOnGet dict subclass)
Copy link
Copy Markdown
Contributor

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off-topic, but do we know why we use source staleness to decide whether to run a model or not? I guess dbt does not do that, no?

@pankajastro
Copy link
Copy Markdown
Contributor Author

pankajastro commented Apr 16, 2026

Off-topic, but do we know why we use source staleness to decide whether to run a model or not? I guess dbt does not do that, no?

Good question. The main idea is that if upstream sources are stale, any downstream models built on top of them will also be stale, so running them doesn’t add much value. Instead, it just consumes compute and time.

Using source freshness (or staleness) as a signal lets us skip unnecessary runs and save resources.

You’re right that dbt doesn’t enforce this by default—its freshness checks are more for monitoring and alerting. This approach is more about optimizing pipeline efficiency rather than something dbt does out of the box. dbt has similar concept called state-aware-orchestration

Copy link
Copy Markdown
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this, @pankajastro !

Some questions: in the PR description, why did you mention this relates to #2536 and not that it fixes it? What is missing for us to close #2536? I feel like this addresses it? Do we expect a follow up PR?

The title may explain how, but not necessarily the why we're making the change. It would be great if the title reflected the problem we're facing

  • Fix incorrectly skipped source downstream tasks in the watcher
    Or something along these lines

@pankajastro pankajastro changed the title Fix over-aggressive skip in _default_freshness_callback Fix incorrectly skipped source downstream tasks in the watcher Apr 17, 2026
@pankajastro
Copy link
Copy Markdown
Contributor Author

pankajastro commented Apr 17, 2026

Thanks for fixing this, @pankajastro !

Some questions: in the PR description, why did you mention this relates to #2536 and not that it fixes it? What is missing for us to close #2536? I feel like this addresses it? Do we expect a follow up PR?

The title may explain how, but not necessarily the why we're making the change. It would be great if the title reflected the problem we're facing

  • Fix incorrectly skipped source downstream tasks in the watcher
    Or something along these lines

Wanted to discuss: #2536 (comment) and there is not any follow-up PR planned

@pankajastro pankajastro merged commit f2de9af into main Apr 17, 2026
124 checks passed
@pankajastro pankajastro deleted the FIX_2536 branch April 17, 2026 12:56
@pankajkoti pankajkoti mentioned this pull request Apr 23, 2026
@tatiana tatiana added this to the Cosmos 1.14.1 milestone Apr 23, 2026
tatiana pushed a commit that referenced this pull request Apr 23, 2026
Bug Fixes

* Fix ``ExecutionMode.WATCHER`` producer retry behaviour by @tatiana in
#2559
* Prevent watcher producer skip propagating to downstream tasks via
gateway task by @johnhoran and @tatiana in #2597
* Keep watcher sensor polling when producer is still running by
@pankajkoti in #2592
* Fix circular import error in Cosmos plugin discovery under Astro
Runtime by @tatiana in #2538
* Fix ``CosmosRichLogger`` crash on ``None`` log message by @tatiana in
#2540
* Enable inlets and outlets using dbt Fusion on Airflow 3 by
@ichirotakami in #2561
* Fix incorrectly skipped source downstream tasks in
``ExecutionMode.WATCHER`` by @pankajastro in #2563
* Fix duplicate logs in ``dbt build`` when source freshness is enabled
by @pankajastro in #2564
* Warn and normalize when ``source_rendering_behavior=None`` is passed
by @pankajastro in #2570
* Gracefully handle ``Variable.set()`` failures on Astro Remote
Execution by @hkc-8010 in #2573
* Skip malformed YAML selectors instead of failing entirely by
@YourRoyalLinus in #2577

Docs

* Update watcher test behavior docs for Cosmos 1.14.0 by @tatiana in
#2549
* Add redirect for moved partial-parsing docs page by @tatiana in #2550
* Document ``ExecutionMode.WATCHER`` and ``depends_on_past`` limitation
by @tatiana in #2602
* Restore memory-optimised imports docs for Cosmos < 1.14.0 by
@pankajkoti in #2604

Others

* Speed up Airflow 3.1+ integration tests by caching
InProcessExecutionAPI by @pankajkoti in #2547
* Improve stability of cache hash unit tests by @tatiana in #2539
* Fix mypy 1.20.0 type check failures by @pankajkoti in #2546
* Fix CI failures caused by docs build memory exhaustion by @pankajkoti
in #2580
* Fix dbt Fusion broken integration tests by @tatiana in #2581
* Fix flaky ``cosmos_manifest_selectors_example`` DAG in CI by
@pankajkoti in #2593
* Reduce pre-commit autoupdate frequency PRs by @tatiana in #2544
* Bump ``reviewdog/action-actionlint`` from 1.71.0 to 1.72.0 by
@dependabot in #2542
* Skip watcher gateway test on Airflow 3.0 by @tatiana in #2607

closes: astronomer/oss-integrations-private#381
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Validate Cosmos 1.14.0 watcher source freshness behaviour

4 participants