Skip watcher gateway test on Airflow 3.0#2607
Merged
Merged
Conversation
The integration test test_dbt_task_group_watcher_gateway_prevents_downstream_skip started failing on main after #2592 and #2597 landed together, even though both PRs passed their own CI independently. The PR head of #2597 was based on main before #2592 merged, so the two changes were never exercised together until the merge commit. On Airflow 3.0, dag.test() runs tasks inline via _run_raw_task without starting the task SDK supervisor. SUPERVISOR_COMMS is therefore never initialised, and RuntimeTaskInstance.get_task_states raises NameError inside fetch_state_airflow3. The state fetcher returns None, so the consumer sensor sees the producer as "still unknown". Combined with the retry behaviour introduced in #2592 — which keeps polling instead of falling back when the producer state is unknown — the sensor repeatedly re-reads the stale error status from the producer's first attempt and exhausts retries, marking the DAG as failed. Airflow 3.2 rebuilt dag.test() on top of run_task_in_process, which initialises SUPERVISOR_COMMS in-process, so the same code path works there. Airflow 3.1.x is already skipped due to an unrelated SetRenderedFields crash. Extend the existing skipif to also cover Airflow 3.0 until we can fetch the producer state via a direct DB query fallback when the supervisor is absent. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR restores CI stability by skipping a flaky/failing watcher gateway integration test on Airflow 3.0, where dag.test() runs tasks inline without the task SDK supervisor and breaks the producer state detection logic used by the watcher sensor.
Changes:
- Extend the existing
pytest.mark.skipiffortest_dbt_task_group_watcher_gateway_prevents_downstream_skipto also skip Airflow 3.0 (now skipping Airflow 3.0–3.1.x). - Expand the skip reason message to document the Airflow 3.0 failure mode.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pankajastro
approved these changes
Apr 23, 2026
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2607 +/- ##
=======================================
Coverage 98.05% 98.05%
=======================================
Files 104 104
Lines 7747 7762 +15
=======================================
+ Hits 7596 7611 +15
Misses 151 151 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
pankajkoti
approved these changes
Apr 23, 2026
pankajkoti
pushed a commit
that referenced
this pull request
Apr 23, 2026
## Summary - Extend the `skipif` on `test_dbt_task_group_watcher_gateway_prevents_downstream_skip` to also cover Airflow 3.0. - Unblocks main, which started failing in the `Run-Integration-Tests (3.10/3.11/3.12, 3.0, 1.11, 2)` jobs right after #2597 merged. This PR is scoped purely to a test skip to restore CI. It does not change production code. ## Why the test broke on main The test was added in #2597. Its PR CI was green because #2597's head was branched off main *before* #2592 merged — the two changes were merged together without a re-run, so main is the first place they run as a pair. On Airflow 3.0, `dag.test()` runs tasks inline via `ti._run_raw_task` without starting the task SDK supervisor, so `SUPERVISOR_COMMS` is never initialised. `RuntimeTaskInstance.get_task_states` raises `NameError`, and `fetch_state_airflow3` in `cosmos/operators/_watcher/state.py` returns `None`. Under the retry behaviour introduced in #2592, an unknown producer state makes the consumer sensor keep polling instead of falling back to a local dbt run. The sensor re-reads the stale `"error"` status from the producer's first attempt and exhausts retries, so the DAG finishes `failed`. Airflow 3.2 rebuilt `dag.test()` on top of `run_task_in_process`, which sets up an in-process supervisor, so the same code path works there. Airflow 3.1.x is already skipped because of the unrelated `SetRenderedFields` retry crash. ## Follow-up (not in this PR) A more durable fix would be to make `fetch_state_airflow3` fall back to a direct DB query when `SUPERVISOR_COMMS` is unavailable, mirroring the Airflow 2 path. That would let this test run on Airflow 3.0 again. It is out of scope here. ## Test plan - [x] `hatch run tests.py3.11-3.0-1.9:pytest -v -m integration tests/operators/test_watcher.py::test_dbt_task_group_watcher_gateway_prevents_downstream_skip` → SKIPPED locally - [ ] CI green on Airflow 2.10, 2.11, 3.0, 3.1, 3.2 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> (cherry picked from commit 238bc55)
Merged
tatiana
pushed a commit
that referenced
this pull request
Apr 23, 2026
Bug Fixes * Fix ``ExecutionMode.WATCHER`` producer retry behaviour by @tatiana in #2559 * Prevent watcher producer skip propagating to downstream tasks via gateway task by @johnhoran and @tatiana in #2597 * Keep watcher sensor polling when producer is still running by @pankajkoti in #2592 * Fix circular import error in Cosmos plugin discovery under Astro Runtime by @tatiana in #2538 * Fix ``CosmosRichLogger`` crash on ``None`` log message by @tatiana in #2540 * Enable inlets and outlets using dbt Fusion on Airflow 3 by @ichirotakami in #2561 * Fix incorrectly skipped source downstream tasks in ``ExecutionMode.WATCHER`` by @pankajastro in #2563 * Fix duplicate logs in ``dbt build`` when source freshness is enabled by @pankajastro in #2564 * Warn and normalize when ``source_rendering_behavior=None`` is passed by @pankajastro in #2570 * Gracefully handle ``Variable.set()`` failures on Astro Remote Execution by @hkc-8010 in #2573 * Skip malformed YAML selectors instead of failing entirely by @YourRoyalLinus in #2577 Docs * Update watcher test behavior docs for Cosmos 1.14.0 by @tatiana in #2549 * Add redirect for moved partial-parsing docs page by @tatiana in #2550 * Document ``ExecutionMode.WATCHER`` and ``depends_on_past`` limitation by @tatiana in #2602 * Restore memory-optimised imports docs for Cosmos < 1.14.0 by @pankajkoti in #2604 Others * Speed up Airflow 3.1+ integration tests by caching InProcessExecutionAPI by @pankajkoti in #2547 * Improve stability of cache hash unit tests by @tatiana in #2539 * Fix mypy 1.20.0 type check failures by @pankajkoti in #2546 * Fix CI failures caused by docs build memory exhaustion by @pankajkoti in #2580 * Fix dbt Fusion broken integration tests by @tatiana in #2581 * Fix flaky ``cosmos_manifest_selectors_example`` DAG in CI by @pankajkoti in #2593 * Reduce pre-commit autoupdate frequency PRs by @tatiana in #2544 * Bump ``reviewdog/action-actionlint`` from 1.71.0 to 1.72.0 by @dependabot in #2542 * Skip watcher gateway test on Airflow 3.0 by @tatiana in #2607 closes: astronomer/oss-integrations-private#381
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
skipifontest_dbt_task_group_watcher_gateway_prevents_downstream_skipto also cover Airflow 3.0.Run-Integration-Tests (3.10/3.11/3.12, 3.0, 1.11, 2)jobs right after Prevent watcher producer skip propagating to downstream tasks via gateway task #2597 merged.This PR is scoped purely to a test skip to restore CI. It does not change production code.
Why the test broke on main
The test was added in #2597. Its PR CI was green because #2597's head was branched off main before #2592 merged — the two changes were merged together without a re-run, so main is the first place they run as a pair.
On Airflow 3.0,
dag.test()runs tasks inline viati._run_raw_taskwithout starting the task SDK supervisor, soSUPERVISOR_COMMSis never initialised.RuntimeTaskInstance.get_task_statesraisesNameError, andfetch_state_airflow3incosmos/operators/_watcher/state.pyreturnsNone. Under the retry behaviour introduced in #2592, an unknown producer state makes the consumer sensor keep polling instead of falling back to a local dbt run. The sensor re-reads the stale"error"status from the producer's first attempt and exhausts retries, so the DAG finishesfailed.Airflow 3.2 rebuilt
dag.test()on top ofrun_task_in_process, which sets up an in-process supervisor, so the same code path works there. Airflow 3.1.x is already skipped because of the unrelatedSetRenderedFieldsretry crash.Follow-up (not in this PR)
A more durable fix would be to make
fetch_state_airflow3fall back to a direct DB query whenSUPERVISOR_COMMSis unavailable, mirroring the Airflow 2 path. That would let this test run on Airflow 3.0 again. It is out of scope here.Test plan
hatch run tests.py3.11-3.0-1.9:pytest -v -m integration tests/operators/test_watcher.py::test_dbt_task_group_watcher_gateway_prevents_downstream_skip→ SKIPPED locally🤖 Generated with Claude Code