Skip to content

Fix ExecutionMode.WATCHER behaviour with DbtTaskGroup#2044

Merged
tatiana merged 18 commits into
mainfrom
fix-watcher-taskgroup
Oct 24, 2025
Merged

Fix ExecutionMode.WATCHER behaviour with DbtTaskGroup#2044
tatiana merged 18 commits into
mainfrom
fix-watcher-taskgroup

Conversation

@tatiana
Copy link
Copy Markdown
Collaborator

@tatiana tatiana commented Oct 21, 2025

While validating TaskGroup in a session with an end-user with Cosmos v1.11.0a8, we identified that it was not working as expected.

There were two main issues:

  1. We were not setting the producer_task_id argument correctly in the sensor tasks
  2. The producer task was always being picked up after the sensor tasks, even though it had a higher priority weight and was an "upstream" task of the dbt root nodes (with a trigger rule always, so they could start sensing).

The first issue was a bug, which is now fixed.

The second issue occurs because the sensor tasks have a trigger_rule of "always". This setting works well in the case of DbtDag, as the only upstream task is the producer task, and the producer task and root dbt project nodes can be scheduled at the same time, with the task weight of the producer giving its precedence. However, in the case of a DbtTaskGroup with one or multiple upstream tasks, the producer task must wait for the upstream task, whereas the dbt project root nodes ("top sensors") do not have to wait. This leads to sensors occupying the available lots before the producer is scheduled to arrive.

At some point, we suspected this was an issue with Airflow itself, but after a troubleshooting session with Ash, we realised it was not a bug in Airflow: apache/airflow#56723.

The side-effect of the implementation in this PR is that DbtTaskGroup and DbtDag are behaving differently:

  • In DbtDag, the producer is upstream of the dbt root nodes
  • In DbtTaskGroup, the producer is not an upstream root node

The reason we are keeping this behaviour in DbtDag is that it is not leading to any issues. It is topologically "correct", making it easier for users to see the dependency between the tasks & potentially view the task list topologically sorted.

At this moment, we were unable to find a way to offer this same behaviour with DbtTaskGroup. @ashb , with whom I troubleshooted the original "problem", pointed out Airflow could potentially have a different "state" that wouldn't always be, but something like "start when my upstream task started". However, by examining Airflow's current implementation, any changes to this area could potentially incur an additional cost to the scheduler and compromise Airflow's performance.

If we were able to "force" in the Airflow UI to have the producer task on the "top" both in the graph view and in the list of functions view, we could make DbtDagand DbtTaskGroup more consistent (by not having the producer task as upstream and dbt root nodes with trigger_rule always). Currently, in Airflow 3.0.x and Airflow 3.1.0, the producer task has been showing randomly, unfortunately, but we will work with the Airflow team to improve this and add follow-up tickets as needed.

Closes: #1961
Closes: https://github.com/astronomer/oss-integrations-private/issues/240

Comment thread dev/dags/example_watcher.py Outdated
Comment thread cosmos/airflow/graph.py Outdated
Comment thread dev/dags/example_watcher.py Outdated
@tatiana tatiana marked this pull request as ready for review October 22, 2025 08:44
Copilot AI review requested due to automatic review settings October 22, 2025 08:44
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes the behavior of ExecutionMode.WATCHER when used with DbtTaskGroup by refactoring how the producer watcher task establishes dependencies with root dbt nodes.

  • Renamed _add_producer_watcher() to _add_producer_watcher_and_dependencies() to better reflect its expanded responsibilities
  • Moved producer watcher creation to occur after task dependencies are established, enabling proper detection of root nodes
  • Added workaround for Airflow bug that prevents setting producer task as upstream dependency in TaskGroups

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

File Description
cosmos/airflow/graph.py Refactored producer watcher creation logic to properly handle dependencies for both DbtDag and DbtTaskGroup
tests/operators/test_watcher.py Added new integration test for DbtTaskGroup with watcher mode and updated existing test documentation
dags/example_watcher.py Created new example DAG demonstrating watcher mode usage
dev/dags/example_watcher.py Removed documentation comment markers

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment thread cosmos/airflow/graph.py
@codecov
Copy link
Copy Markdown

codecov Bot commented Oct 22, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 97.71%. Comparing base (a9fca6a) to head (499ab48).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2044   +/-   ##
=======================================
  Coverage   97.70%   97.71%           
=======================================
  Files          89       89           
  Lines        5673     5681    +8     
=======================================
+ Hits         5543     5551    +8     
  Misses        130      130           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to merge if we've validated the behaviour. Don't have time myself today to validate the issue and the fix.

Comment thread cosmos/airflow/graph.py Outdated
Comment thread cosmos/airflow/graph.py Outdated
…T_RUNNER

```
With 1.11.0a8, Cosmos would fail with:
  File "/Users/tatiana.alchueyr/Code/astronomer-cosmos/dags/example_watcher.py", line 46, in <module>
    execution_config=ExecutionConfig(
                     ^^^^^^^^^^^^^^^^
  File "<string>", line 9, in __init__
  File "/Users/tatiana.alchueyr/Code/astronomer-cosmos/cosmos/config.py", line 438, in __post_init__
    raise CosmosValueError(
cosmos.exceptions.CosmosValueError: ExecutionConfig.invocation_mode is only configurable for ExecutionMode.LOCAL and ExecutionMode.VIRTUALENV.
```

If users attempted to run the DbtDag or TaskGroup setting the invocation_mode=InvocationMode.DBT_RUNNER:

```
from cosmos.constants import InvocationMode

example_watcher = DbtDag(
    # dbt/cosmos-specific parameters
    execution_config=ExecutionConfig(
        execution_mode=ExecutionMode.WATCHER,
        invocation_mode=InvocationMode.DBT_RUNNER
    ),
    project_config=ProjectConfig(DBT_PROJECT_PATH),
    profile_config=profile_config,
    operator_args=operator_args,
    # normal dag parameters
    schedule="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="example_watcher",
    default_args={"retries": 0},
)
```

This PR fixes this.
@tatiana tatiana merged commit fa81082 into main Oct 24, 2025
81 checks passed
@tatiana tatiana deleted the fix-watcher-taskgroup branch October 24, 2025 08:40
@tatiana tatiana added this to the Cosmos 1.11.0 milestone Oct 28, 2025
@tatiana tatiana mentioned this pull request Oct 29, 2025
tatiana added a commit that referenced this pull request Oct 29, 2025
**Features**

* Introduce ``ExecutionMode.WATCHER`` to reduce DAG run time by 1/5 in
several PRs. Learn more about it
[here](https://astronomer.github.io/astronomer-cosmos/getting_started/watcher-execution-mode.html#watcher-execution-mode).
This feature was implemented via multiple PRs, including:
* Expose new execution mode by @tatiana @pankajastro @pankajkoti in
#1999
* Add ``DbtProducerWatcherOperator`` for the proposed
``ExecutionMode.WATCHER`` by @pankajkoti in #1982
* Add ``DbtConsumerWatcherSensor`` for the proposed
``ExecutionMode.WATCHER`` by @pankajastro in #1998
* Push producer's task completion status to XCOM by @pankajkoti in #2000
* Add default priority_weight for ``DbtProducerWatcherOperator`` by
@pankajkoti in #1995
* Add sample dbt events for the dbt watcher execution mode by
@pankajkoti in #1952
* Add ``compiled_sql`` as a template fields on
```ExecutionMode.WATCHER``` when using ``run_results.json`` by
@pankajastro in #2070
* Set ``push_run_results_to_xcom`` kwargs correctly for invocation mode
subprocess and Watcher mode by @pankajastro in #2067
* Store compiled SQL as template field for dbt callback events in
``ExecutionMode.WATCHER`` by @pankajkoti in #2068
* Add initial documentation for ``ExecutionMode.WATCHER`` by @tatiana in
#2046
* Support running ``State.UPSTREAM_FAILED`` tasks when WATCHER consumer
upstream tasks fail by @tatiana in #2062
* Fail sensor tasks immediately if the ``ExecutionMode.WATCHER``
producer task fails by @pankajastro in #2040
  * Add ``WATCHER``` to GitHub issue template by @tatiana in #2056
* Add support for ``TestBehavior.AFTER_ALL`` with
``ExecutionMode.WATCHER`` by @pankajastro in #2049
* Add support for ``TestBehavior.NONE`` with ``ExecutionMode.WATCHER``
by @pankajastro in #2047
* Fix ``ExecutionMode.WATCHER`` behaviour with ``DbtTaskGroup`` by
@tatiana in #2044
* Fix Cosmos behaviour when using watcher with
``InvocationMode.DBT_RUNNER`` by @tatiana in #2048

* Add Airflow 3 plugin for dbt docs with multiple dbt projects support
by @pankajkoti in #2009, check the
[documentation](https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html).
* Initial support to ``dbt Fusion`` by @tatiana in #1803. More details
[here](https://astronomer.github.io/astronomer-cosmos/configuration/dbt-fusion).
* Support to prune sources without downstream references in dbt projects
by @corsettigyg in #1988
* Allow to set task display name as a user-defined function by
@corsettigyg in #1761
* Add dbt project's hash to dag docs to support dag versioning in
Airflow 3 by @pankajkoti in #1907
* feat: Add Jinja templating support for ``dbt_cmd_flags`` by
@skillicinski in #1899
* Add Scarf metric to collect the execution mode uses by @pankajastro in
#1981
* Support Airflow 3.1 by @tatiana in #1980
* Add MySQL profile mapping by @Lee2532 in #1977
* Add sqlserver profile mapping by @pankajastro in #1737

**Enhancement**

* Use XCom to store sql when using ``ExecutionMode.AIRFLOW_ASYNC`` by
@pankajastro in #1934
* Refactor ``AIRFLOW_ASYNC`` teardown so it doesn't install the
virtualenv by @pankajastro in #1938
* Reuse the virtual env for ``AIRFLOW_ASYNC`` setup task by @pankajastro
in #1939
* Improve dataset/asset experience in Cosmos by @tatiana in #2030
* Add ``downstreams`` to ``DbtNode`` by @wornjs in #2028

**Bug fixes**

* Fix tags extraction by @ms32035 in #1915
* Fix task flow operator args by @anyapriya in #2024

**Documentation**

* Add documentation for Airflow 3 Plugin supporting dbt docs for
multiple dbt projects by @pankajkoti in #2063
* Add Cosmos Deferrable Operator Guide by @pankajastro in #1922
* Add dbt Fusion documentation by @tatiana in #1824 #1830
* Update dbt-fusion.rst to explicitly highlight it is in alpha by
@tatiana in #1838
* Fix a bunch of docs build errors and warnings by @pankajkoti in
#1886
* Add docs note for param virtualenv_dir for async execution mode by
@pankajastro in #1969
* Use pepy.tech downloads badge in README by @pankajkoti in #1920
* Correct the default value of ``cache_dir`` by @seokyun.ha in #2027

**Others**

* Promote @corsettigyg to committer by @tatiana in #1985
* Add @pankajkoti and @pankajastro to ``contributors.rst`` by @tatiana
in #1983
* Update setup script for airflow3 script by @dwreeves in #2023
* Prevent pytest from trying to test classes that aren't actually tests
by @anyapriya in #2032
* Fix ``dag.test()`` for Airflow 3.1+ by syncing DAG to database bby
@kaxil in #2037
* Disable Scarf in CI by @pankajastro in #2016
* Fix failing dbt Fusion tests when run in parallel in CI by @pankajkoti
in #1896
* Fix MyPy issues related to ``ObjectStoragePath`` in main branch by
@tatiana in #2012
* Cleanup example dbt event JSON dictionaries kept for XCOM referencby
@pankajkoti in #1997
* Bump min hatch version that includes fixes for click>=8.3.0 by
@pankajkoti in #1996
* Use official postgres image from Docker hub for kubernetes setup by
@pankajkoti in #1986
* Use click<8.3.0 for hatch as click 8.3 breaks hatch by @pankajkoti in
#1987
* Pin Airflow version in type check CI job by @pankajastro in #2003
* Improve comments after feedback on #1948 by @tatiana in #1963
* Fix running tests with dbt Fusion 2.0.0 preview versions by @tatiana
in #1948
* Test hardening of dbt node having tags as unset or missing by
@pankajkoti in #1918
* Fix Sphinx issue in the main branch by @tatiana in #2064
* pre-commit autoupdate in #2065, #2043, #2033, #2019, #1990, #2019,
#2008, #1941, #1935, #1924
* GitHub dependabot update in #2051, #2050, #2038, #2022, #1947, #1955,
#1946, #1944, #1945, #1928, #1921, #1917


Co-authored-by: Pankaj Koti <pankaj.koti@astronomer.io>
Co-authored-by: Pankaj Singh <pankaj.singh@astronomer.io>
Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Validate Watcher execution mode works with TaskGroups [ExecutionMode.WATCHER]

3 participants