Refactor ExecutionMode.WATCHER so the logic is centralised#2236
Conversation
In providers, it is common to split Airflow code into operators, triggers, and other folders. In the case of Cosmos, execution modes were originally added to the operators folder, since all the code for each execution mode was operator-only. With the introduction of the async and watcher execution modes, the code became more complex, with the need for a custom triggerer for watcher. With this, watcher execution mode code started spreading throughout the Cosmos code base, making the code distribution more complex than needed. This PR brings the watcher triggerer logic close to the operator, so that most of the watcher specific logic lives close to the execution mode implementation. In Cosmos 2.0, we should review how we name Cosmos packages, potentially calling this folder execution_modes.
✅ Deploy Preview for astronomer-cosmos canceled.
|
There was a problem hiding this comment.
Pull request overview
This PR refactors the ExecutionMode.WATCHER implementation by consolidating its logic within the cosmos/operators/_watcher module, moving the WatcherTrigger class from cosmos/_triggers/watcher.py to cosmos/operators/_watcher/triggerer.py. This change improves code organization by keeping execution mode-related code centralized rather than scattered across the codebase.
Key changes:
- Moved
WatcherTriggerand_parse_compressed_xcomfromcosmos._triggers.watchertocosmos.operators._watcher.triggerer - Updated all imports throughout the codebase to reference the new location
- Updated the triggerer's serialization path to reflect its new module location
Reviewed changes
Copilot reviewed 6 out of 8 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| tests/operators/test_watcher.py | Updated import statement to use new WatcherTrigger location |
| tests/operators/_watcher/test_triggerer.py | Updated all patch paths to reflect new module structure |
| cosmos/operators/watcher.py | Simplified imports to use new centralized location |
| cosmos/operators/_watcher/triggerer.py | Updated serialization path for WatcherTrigger |
| cosmos/operators/_watcher/state.py | Updated TODO comment to reference new module location |
| cosmos/operators/_watcher/init.py | Added WatcherTrigger and _parse_compressed_xcom to module exports |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pankajkoti
left a comment
There was a problem hiding this comment.
If not already done, would be nice to sanity test this once with running an Airflow Dag and confirm by seeing in the UI that the sensors are getting deferred and sensing XCOMs correctly and that the trigger complets it's lifecycle
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2236 +/- ##
=======================================
Coverage 97.98% 97.98%
=======================================
Files 95 95
Lines 6190 6190
=======================================
Hits 6065 6065
Misses 125 125 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Thanks a lot, @pankajkoti !
Things are working as expected. During the execution of the |


Since #2084, the
ExecutionMode.WATCHERlogic started spreading throughout the Cosmos code base.When implementing Airflow providers, it is common practice to split Airflow code into operators and triggerers folders. However, in the specific case of Cosmos, I believe this approach makes it harder to read and maintain the project and that it is better to centralise the execution mode code.
Historically, Cosmos execution modes were defined in the operators folder because the code for each execution mode was operator-only. With the introduction of the async and watcher execution modes, the execution mode implementation became more complex, requiring a custom triggerer for the watcher. As a result, the watcher execution mode code began spreading throughout the Cosmos codebase, complicating code distribution more than needed.
This PR brings the watcher triggerer logic closer to the operator, so that most of the watcher-specific logic lives near the execution mode implementation.
In Cosmos 2.0, we can review how we name the Cosmos execution mode folder, potentially renaming this folder from operators to execution_modes.