Avoid the Watcher consumers retrying to run a command that is being run by the producer#2557
Avoid the Watcher consumers retrying to run a command that is being run by the producer#2557pankajkoti wants to merge 3 commits into
Conversation
When a sensor times out while the producer is still running a long dbt build, the retry previously launched a standalone dbt run for the same model — creating a duplicate concurrent execution. Now the sensor checks the producer task state before deciding: - Producer terminated → fall back to local dbt run (handles manual UI task clear and retries after producer finished) - Producer still active → continue polling XCom instead of launching a duplicate run Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
f013acf to
05f448f
Compare
The Kubernetes watcher sensor retry test assumed fallback runs unconditionally on retry. Update it to set the producer to a terminal state, and add a test for the case where the producer is still running. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR updates Cosmos watcher consumer sensors to avoid launching duplicate dbt runs on task retries by checking the producer task’s Airflow state and continuing to poll XCom when the producer is still active.
Changes:
- Add
is_producer_task_terminated()helper + terminal state set for producer task state evaluation. - Refactor watcher consumer
poke()retry handling to fall back only when the producer is terminated; otherwise keep polling. - Extend unit tests to cover both retry paths (producer terminated vs still running) for local, Kubernetes, and test watcher sensors.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
cosmos/operators/_watcher/base.py |
Adds retry-handling helper and routes retry behavior based on producer state to prevent duplicate runs. |
cosmos/operators/_watcher/state.py |
Introduces producer terminal state set + helper for determining whether producer is finished. |
tests/operators/test_watcher.py |
Updates/extends tests for retry fallback vs continued polling (including test watcher behavior). |
tests/operators/test_watcher_kubernetes_unit.py |
Adds Kubernetes watcher unit test ensuring retry keeps polling when producer is still running. |
tests/operators/_watcher/test_state.py |
Adds unit tests for the new is_producer_task_terminated() helper. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return self._evaluate_node_status(status, producer_task_state, try_number, context) | ||
|
|
||
| def _evaluate_node_status( |
There was a problem hiding this comment.
Moved this to a new method, as this was getting complained about code complexity
There was a problem hiding this comment.
@pankajkoti Would there be an alternative way to refactor?
The function name (_evaluate_node_status) does not seem accurate with what it is doing, which is actually to:
- check the status of the node
- check the status of the producer airflow task
- maybe execute dbt itself to run, as a fallback
These steps feel quite critical and relevant in the poke method, and it is probably hard to find a suitable name in a single method. There may be other parts of the function that could be refactored and isolated that are not so core to the logic (maybe the retrieval of status, or the logging handling?)
There was a problem hiding this comment.
I think the name broadly reflects the method’s primary responsibility: it evaluates the node status and determines the appropriate poke outcome. By "evaluate/evaluation", I’m including checking dependencies like the producer task state and deciding on actions such as fallback, retry, raise, or succeed. The producer state check and fallback are part of handling cases where no direct status is available.
That said, I agree it’s doing a few important things, so I’m open to renaming if you have a more precise suggestion.
On extracting retrieval/logging: those are already separated into helper methods (_log_startup_events, _get_node_status, _cache_compiled_sql). Wrapping them further would likely add indirection without much benefit. The evaluation logic, on the other hand, forms a cohesive decision tree, which is why it felt reasonable to group it into a single method.
Happy to refactor further if you feel a different split would improve readability.
There was a problem hiding this comment.
This function combines decision-making, state retrieval, and side effects (running dbt). While it is small, it has a lot of responsibility. I'm worried about future readability and testability.
Maybe we could break it down into two smaller methods with clear responsibilities, something along the lines:
_evaluate_node_status: implements the decision tree logic_handle_node_decision(decision, ...): executes side effects - run dbt or others
There was a problem hiding this comment.
Looking at the method again, it doesn’t actually do state retrieval. Both status and producer_task_state are passed in; the retrieval happens in poke. So this method is really decision-making and taking actions based on the decisions.
I feel splitting it would introduce indirection without reducing coupling; the action method would still need the same parameters (try_number, context) for the fallback path.
Right now, the full flow is visible in one place:
- status is None -> check producer -> fallback or keep polling
- skipped -> skip
- success -> done
If we split this into decision + handler, a reader would need to jump between methods and map decisions to actions, which I think hurts readability more than it helps at this size.
I’m happy to rename the method if the current name doesn’t reflect its behaviour well, open to suggestions there. But I’d prefer not to split it unless the logic grows or we see reuse emerging.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2557 +/- ##
=======================================
Coverage 98.04% 98.05%
=======================================
Files 103 103
Lines 7586 7598 +12
=======================================
+ Hits 7438 7450 +12
Misses 148 148 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| if hasattr(self, "_override_rtif"): | ||
| self._override_rtif(context) | ||
|
|
||
| def _handle_retry(self, try_number: int, producer_task_state: str | None, context: Context) -> bool | None: |
There was a problem hiding this comment.
I believe it would be a cleaner interface if we renamed things to is_producer_still_running.
Retry is a consequence depending on other variables - including if the producer is actively running the task
There was a problem hiding this comment.
I believe _handle_retry still makes sense as the name here. It's called from the poke method when try_number > 1, i.e., when we're in an Airflow retry scenario, and this method, as part of handling the Airflow retry, decides whether to actually fall back (producer terminated) or keep polling (producer still active).
There was a problem hiding this comment.
Functions should be named based on what they do, not how they are used. The current name (_handle_retry) is related to where it is used, rather than to its name.
Naming functions after their action creates reusable, predictable code. If a function is named after its specific use case, it becomes difficult to reuse that same logic elsewhere.
There was a problem hiding this comment.
I am a bit torn here. _handle_retry actually does describe what the method does; it handles the retry scenario. It checks the producer state, decides whether to fall back or keep polling, and acts accordingly. That is the method's responsibility. It's not named after its call site; it's named after the action it performs.
When the time comes for reusing (although this function is marked local to the module/class), I believe these are what the actions will be for handling retries for the case, no? Or do you see a different path there?
We already have is_producer_task_terminated for the state check. The suggested is_producer_still_running shouldn't call an action based on its name, so the fallback logic would have to move elsewhere, either into another method or back into poke. If it goes into poke, it's not reusable either.
tatiana
left a comment
There was a problem hiding this comment.
I'm really happy we're improving this part, @pankajkoti ! I left some questions and advice on the implementation. While they will not change the overall logic - your fix seems accurate - I believe they will help with the code maintainability.
Thanks for the review @tatiana. I left my responses on the feedback and addressed some other minor comments that I have marked as resolved. |

When a sensor times out while the producer is still running a long dbt build, the retry previously launched a standalone dbt run for the same model — creating a duplicate concurrent execution.
Now the sensor checks the producer task state before deciding:
closes: https://github.com/astronomer/oss-integrations-private/issues/359