Refactor _default_freshness_callback to return list of (id, state) tuple#2572
Conversation
…ples Change return type from tuple[list[str], str] to list[tuple[str, str]] so each element carries its own state alongside the node unique_id. Update the call site and all related tests accordingly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2572 +/- ##
=======================================
Coverage 98.05% 98.05%
=======================================
Files 103 103
Lines 7601 7604 +3
=======================================
+ Hits 7453 7456 +3
Misses 148 148 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Refactors the WATCHER producer’s source-freshness “skip” callback to return per-node results as (unique_id, state) tuples instead of a (list[unique_id], state) pair, as groundwork for future source-node behavior improvements (Issue #2053).
Changes:
- Updated
cosmos/operators/watcher.py:_default_freshness_callbackreturn type tolist[tuple[str, str]]. - Updated the producer call site to consume the new return shape.
- Updated
_default_freshness_callbackunit tests to assert against the new structure.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
cosmos/operators/watcher.py |
Changes callback return type and adapts _apply_source_freshness to the new structure. |
tests/operators/test_watcher.py |
Updates tests to validate the new (unique_id, state) results. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Change freshness callback state value from "skip" to "skipped" - Rename _skipped_node_token to _apply_node_state_tokens; accept list[tuple[str, str]] so each node carries its own state - Rename _push_skipped_xcom_for_model to _push_node_state_xcom; take an explicit state argument instead of hard-coding "skipped" - Only add nodes to self.exclude when their state is "skipped"; other states push XCom but leave the dbt exclude list unchanged - Update all call sites and tests accordingly Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
_default_freshness_callback to return list of (id, state) tuple
tatiana
left a comment
There was a problem hiding this comment.
Thanks, @pankajastro , it looks good, let's ship this as part of Cosmos 1.15.0
…2617) Add a new experimental section describing the source freshness aware execution feature introduced in PRs #2467, #2572, and #2586: - Update the "Source freshness nodes" limitation to reflect that the feature is now supported since Cosmos 1.15.0 - Add a "Source freshness aware execution (Experimental)" section under Advanced config covering activation, default callback behaviour, consumer state handling, callback signature, and known limitations --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
sharing a single status string
argument, enabling custom callbacks to push states other than "skipped"
This set would set ground work for #2053
Related-to: #2053