Skip to content

Fail sensor tasks immediately if the ExecutionMode.WATCHER producer task fails#2040

Merged
tatiana merged 7 commits into
mainfrom
fix_1960
Oct 27, 2025
Merged

Fail sensor tasks immediately if the ExecutionMode.WATCHER producer task fails#2040
tatiana merged 7 commits into
mainfrom
fix_1960

Conversation

@pankajastro
Copy link
Copy Markdown
Contributor

@pankajastro pankajastro commented Oct 20, 2025

relates to: #1960

This PR adds a default on_failure_callback to the producer task. The callback records the producer task’s state and ensures that the sensor fails immediately if the producer task has failed, improving error visibility and reducing unnecessary wait time.

Comment thread cosmos/operators/watcher.py Outdated
@pankajastro pankajastro marked this pull request as ready for review October 21, 2025 08:55
Copilot AI review requested due to automatic review settings October 21, 2025 08:55
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR enhances the watcher functionality by making sensor tasks fail immediately when the producer task fails, rather than waiting for a timeout. It adds a default on_failure_callback to the producer task that records its failure state via XCom, which the sensor then checks before attempting to poll for events.

Key changes:

  • Added default failure callback to producer task that pushes "failed" state to XCom
  • Modified sensor's poke method to check producer task state and fail immediately if producer has failed
  • Implemented version-aware callback handling to support both single callbacks (Airflow < 2.6.0) and callback lists (Airflow >= 2.6.0)

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.

File Description
cosmos/operators/watcher.py Added producer task failure callback, state checking in sensor, and version-aware callback merging logic
tests/operators/test_watcher.py Updated test mock to account for additional XCom pull for producer state check

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment thread cosmos/operators/watcher.py Outdated
Comment thread cosmos/operators/watcher.py
Comment thread cosmos/operators/watcher.py
Comment thread cosmos/operators/watcher.py Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment thread cosmos/operators/watcher.py
@codecov
Copy link
Copy Markdown

codecov Bot commented Oct 21, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 97.71%. Comparing base (fa81082) to head (4d712a2).

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2040   +/-   ##
=======================================
  Coverage   97.71%   97.71%           
=======================================
  Files          89       89           
  Lines        5681     5700   +19     
=======================================
+ Hits         5551     5570   +19     
  Misses        130      130           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@pankajastro
Copy link
Copy Markdown
Contributor Author

I'll create a follow-up PR to extend the documentation introduced in PR #2046

Copy link
Copy Markdown
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for working on this, @pankajastro !

I have a behavioural question. In the case of Jaffle Shop, let's say there was an issue applying a seed. The producer task fails. Will all watcher tasks be marked as failed, even if they had not been scheduled and did not start sensing?

Please, could you increase the coverage?

@pankajastro
Copy link
Copy Markdown
Contributor Author

I have a behavioural question. In the case of Jaffle Shop, let's say there was an issue applying a seed. The producer task fails. Will all watcher tasks be marked as failed, even if they had not been scheduled and did not start sensing?

If not scheduled, it should go to the upstream_failed state

@tatiana
Copy link
Copy Markdown
Collaborator

tatiana commented Oct 22, 2025

If not scheduled, it should go to the upstream_failed state

@pankajastro, Did you run a few DAGs to confirm this behaviour and ensure that tasks skipped because upstream tasks failed would not enter sensor mode when they attempt to run for the "first time"?

@pankajastro
Copy link
Copy Markdown
Contributor Author

@pankajastro, Did you run a few DAGs to confirm this behaviour and ensure that tasks skipped because upstream tasks failed would not enter sensor mode when they attempt to run for the "first time"?

I'm not sure if I understand it fully, but why should it be skipped?

Screenshot 2025-10-23 at 8 22 21 AM

@tatiana
Copy link
Copy Markdown
Collaborator

tatiana commented Oct 23, 2025

@pankajastro In the case of Jaffle Shop, let's say the following happens:

The dbt_producer_watcher fails while trying to apply the seed raw_customers.
The task raw_customers, which was actively watching, will fail (it becomes red). It will have logs for a first try that was unsuccessful. If it is retried, that would be its second try.
However, the task stg_customers will have never been started; it would never have been initialised as a sensor. It will likely be marked as upstream_failed (I'm asking you to confirm this). If this hypothesis is correct, it will likely never be attempted, and therefore, no logs will be generated for this nonexistent first attempt.

So, my concern is—and this is what I'm asking you to test and confirm—what happens when someone clears the status of the seed raw_customers and its downstream tasks? Let's say it succeeds; the task stg_customers will then proceed to its second attempt, as expected, and execute dbt run. However, I'm worried that the try number on the customers task would be 1 (afterall, it is the first time it is actually being executed), and it would enter sensor mode instead of running the dbt run, and it would wait "forever" (until timeout happens) for an update from the dbt producer that will never come.

@tatiana tatiana changed the title Watcher: Fail sensor tasks immediately if the root dbt build task fails Fail sensor tasks immediately if the ExecutionMode.WATCHER producer task fails Oct 27, 2025
@tatiana
Copy link
Copy Markdown
Collaborator

tatiana commented Oct 27, 2025

@pankajastro In the case of Jaffle Shop, let's say the following happens:

The dbt_producer_watcher fails while trying to apply the seed raw_customers. The task raw_customers, which was actively watching, will fail (it becomes red). It will have logs for a first try that was unsuccessful. If it is retried, that would be its second try. However, the task stg_customers will have never been started; it would never have been initialised as a sensor. It will likely be marked as upstream_failed (I'm asking you to confirm this). If this hypothesis is correct, it will likely never be attempted, and therefore, no logs will be generated for this nonexistent first attempt.

So, my concern is—and this is what I'm asking you to test and confirm—what happens when someone clears the status of the seed raw_customers and its downstream tasks? Let's say it succeeds; the task stg_customers will then proceed to its second attempt, as expected, and execute dbt run. However, I'm worried that the try number on the customers task would be 1 (afterall, it is the first time it is actually being executed), and it would enter sensor mode instead of running the dbt run, and it would wait "forever" (until timeout happens) for an update from the dbt producer that will never come.

I implemented the approach that was discussed with @pankajastro @pankajkoti in #2062

Copy link
Copy Markdown
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pankajastro @pankajkoti and I discussed a few scenarios, and we are extending the original implementation in this PR in a separate PR #2062. We still believe more work will be needed in this area. It seems that Airflow does not expose whether a task has a State.UPSTREAM_FAILED - which affects how we'll handle scenarios.

@tatiana tatiana merged commit 135147f into main Oct 27, 2025
78 checks passed
@tatiana tatiana deleted the fix_1960 branch October 27, 2025 17:44
tatiana added a commit that referenced this pull request Oct 28, 2025
…pstream tasks fail (#2062)

This PR extends the implementation of `ExecutionMode.WATCHER` failure
handling in #2040 to tackle running `State.UPSTREAM_FAILED` tasks.

And aims to offer a solution for the scenario described in:

#2040 (comment)

> @pankajastro In the case of Jaffle Shop, let's say the following
happens:
>
> The `dbt_producer_watcher` fails while trying to apply the seed
`raw_customers`.
The task `raw_customers`, which was actively watching, will fail (it
becomes red). It will have logs for a first try that was unsuccessful.
If it is retried, that would be its second try.
However, the task `stg_customers` will have never been started; it would
never have been initialised as a sensor. It will likely be marked as
`upstream_failed` (I'm asking you to confirm this). If this hypothesis
is correct, it will likely never be attempted, and therefore, no logs
will be generated for this nonexistent first attempt.
>
> So, my concern is—and this is what I'm asking you to test and
confirm—what happens when someone clears the status of the seed
`raw_customers` and its downstream tasks? Let's say it succeeds; the
task `stg_customers` will then proceed to its second attempt, as
expected, and execute `dbt run`. However, I'm worried that the try
number on the `customers` task would be 1 (afterall, it is the first
time it is actually being executed), and it would enter sensor mode
instead of running the `dbt run`, and it would wait "forever" (until
timeout happens) for an update from the dbt producer that will never
come.

Previously - in #2040 - if the status of the seed `raw_customers` was
cleared and that task was run sucessfully falling back to
`ExecutionMode.LOCAL`, the downstream tasks would fail. This happened
because it was their first try, and they'd see the producer task had
failed, and they would fail, giving a poor user-experience.

With the current PR, we attempt to understand if a task is a
`State.UPSTREAM_FAILED` task based on the newly introduced property
`poke_retry_number`.

Relates to: #1960
@tatiana tatiana added this to the Cosmos 1.11.0 milestone Oct 28, 2025
@tatiana tatiana mentioned this pull request Oct 29, 2025
tatiana added a commit that referenced this pull request Oct 29, 2025
**Features**

* Introduce ``ExecutionMode.WATCHER`` to reduce DAG run time by 1/5 in
several PRs. Learn more about it
[here](https://astronomer.github.io/astronomer-cosmos/getting_started/watcher-execution-mode.html#watcher-execution-mode).
This feature was implemented via multiple PRs, including:
* Expose new execution mode by @tatiana @pankajastro @pankajkoti in
#1999
* Add ``DbtProducerWatcherOperator`` for the proposed
``ExecutionMode.WATCHER`` by @pankajkoti in #1982
* Add ``DbtConsumerWatcherSensor`` for the proposed
``ExecutionMode.WATCHER`` by @pankajastro in #1998
* Push producer's task completion status to XCOM by @pankajkoti in #2000
* Add default priority_weight for ``DbtProducerWatcherOperator`` by
@pankajkoti in #1995
* Add sample dbt events for the dbt watcher execution mode by
@pankajkoti in #1952
* Add ``compiled_sql`` as a template fields on
```ExecutionMode.WATCHER``` when using ``run_results.json`` by
@pankajastro in #2070
* Set ``push_run_results_to_xcom`` kwargs correctly for invocation mode
subprocess and Watcher mode by @pankajastro in #2067
* Store compiled SQL as template field for dbt callback events in
``ExecutionMode.WATCHER`` by @pankajkoti in #2068
* Add initial documentation for ``ExecutionMode.WATCHER`` by @tatiana in
#2046
* Support running ``State.UPSTREAM_FAILED`` tasks when WATCHER consumer
upstream tasks fail by @tatiana in #2062
* Fail sensor tasks immediately if the ``ExecutionMode.WATCHER``
producer task fails by @pankajastro in #2040
  * Add ``WATCHER``` to GitHub issue template by @tatiana in #2056
* Add support for ``TestBehavior.AFTER_ALL`` with
``ExecutionMode.WATCHER`` by @pankajastro in #2049
* Add support for ``TestBehavior.NONE`` with ``ExecutionMode.WATCHER``
by @pankajastro in #2047
* Fix ``ExecutionMode.WATCHER`` behaviour with ``DbtTaskGroup`` by
@tatiana in #2044
* Fix Cosmos behaviour when using watcher with
``InvocationMode.DBT_RUNNER`` by @tatiana in #2048

* Add Airflow 3 plugin for dbt docs with multiple dbt projects support
by @pankajkoti in #2009, check the
[documentation](https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html).
* Initial support to ``dbt Fusion`` by @tatiana in #1803. More details
[here](https://astronomer.github.io/astronomer-cosmos/configuration/dbt-fusion).
* Support to prune sources without downstream references in dbt projects
by @corsettigyg in #1988
* Allow to set task display name as a user-defined function by
@corsettigyg in #1761
* Add dbt project's hash to dag docs to support dag versioning in
Airflow 3 by @pankajkoti in #1907
* feat: Add Jinja templating support for ``dbt_cmd_flags`` by
@skillicinski in #1899
* Add Scarf metric to collect the execution mode uses by @pankajastro in
#1981
* Support Airflow 3.1 by @tatiana in #1980
* Add MySQL profile mapping by @Lee2532 in #1977
* Add sqlserver profile mapping by @pankajastro in #1737

**Enhancement**

* Use XCom to store sql when using ``ExecutionMode.AIRFLOW_ASYNC`` by
@pankajastro in #1934
* Refactor ``AIRFLOW_ASYNC`` teardown so it doesn't install the
virtualenv by @pankajastro in #1938
* Reuse the virtual env for ``AIRFLOW_ASYNC`` setup task by @pankajastro
in #1939
* Improve dataset/asset experience in Cosmos by @tatiana in #2030
* Add ``downstreams`` to ``DbtNode`` by @wornjs in #2028

**Bug fixes**

* Fix tags extraction by @ms32035 in #1915
* Fix task flow operator args by @anyapriya in #2024

**Documentation**

* Add documentation for Airflow 3 Plugin supporting dbt docs for
multiple dbt projects by @pankajkoti in #2063
* Add Cosmos Deferrable Operator Guide by @pankajastro in #1922
* Add dbt Fusion documentation by @tatiana in #1824 #1830
* Update dbt-fusion.rst to explicitly highlight it is in alpha by
@tatiana in #1838
* Fix a bunch of docs build errors and warnings by @pankajkoti in
#1886
* Add docs note for param virtualenv_dir for async execution mode by
@pankajastro in #1969
* Use pepy.tech downloads badge in README by @pankajkoti in #1920
* Correct the default value of ``cache_dir`` by @seokyun.ha in #2027

**Others**

* Promote @corsettigyg to committer by @tatiana in #1985
* Add @pankajkoti and @pankajastro to ``contributors.rst`` by @tatiana
in #1983
* Update setup script for airflow3 script by @dwreeves in #2023
* Prevent pytest from trying to test classes that aren't actually tests
by @anyapriya in #2032
* Fix ``dag.test()`` for Airflow 3.1+ by syncing DAG to database bby
@kaxil in #2037
* Disable Scarf in CI by @pankajastro in #2016
* Fix failing dbt Fusion tests when run in parallel in CI by @pankajkoti
in #1896
* Fix MyPy issues related to ``ObjectStoragePath`` in main branch by
@tatiana in #2012
* Cleanup example dbt event JSON dictionaries kept for XCOM referencby
@pankajkoti in #1997
* Bump min hatch version that includes fixes for click>=8.3.0 by
@pankajkoti in #1996
* Use official postgres image from Docker hub for kubernetes setup by
@pankajkoti in #1986
* Use click<8.3.0 for hatch as click 8.3 breaks hatch by @pankajkoti in
#1987
* Pin Airflow version in type check CI job by @pankajastro in #2003
* Improve comments after feedback on #1948 by @tatiana in #1963
* Fix running tests with dbt Fusion 2.0.0 preview versions by @tatiana
in #1948
* Test hardening of dbt node having tags as unset or missing by
@pankajkoti in #1918
* Fix Sphinx issue in the main branch by @tatiana in #2064
* pre-commit autoupdate in #2065, #2043, #2033, #2019, #1990, #2019,
#2008, #1941, #1935, #1924
* GitHub dependabot update in #2051, #2050, #2038, #2022, #1947, #1955,
#1946, #1944, #1945, #1928, #1921, #1917


Co-authored-by: Pankaj Koti <pankaj.koti@astronomer.io>
Co-authored-by: Pankaj Singh <pankaj.singh@astronomer.io>
Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants