Release 1.14.1#2601
Conversation
…ion (#2540) Although the unittests are passing in the CI, there are a few failing locally. An example, when running: ``` AIRFLOW_HOME=`pwd` hatch run tests.py3.10-3.1-1.11:test-cov ``` I was seeing: ``` FAILED tests/airflow/test_graph.py::test_create_task_metadata_unsupported - AssertionError: assert equals failed '\x1b[35m(astronomer-cosmos)\x1b[0m Unavailable conversion function for <u 'Unavailable conversion function for <unsupported> (node <unsupported.dbt- nsupported> (node <unsupported.dbt-proj.unsupported>). Define a converter proj.unsupported>). Define a converter function using render_config.node_c function using render_config.node_converters.' and also FAILED tests/dbt/test_selector.py::test_select_nodes_by_invalid_config - AssertionError: assert 'Unsupported config key selector: invalid_config' in ['\x1b[35m(astronomer-cosmos)\x1b[0m Unsupported config key selector: invalid_config'] + where ['\x1b[35m(astronomer-cosmos)\x1b[0m Unsupported config key selector: invalid_config'] = <_pytest.logging.LogCaptureFixture object at 0x16f34f730>.messages FAILED tests/operators/test_kubernetes.py::test_dbt_kubernetes_operator_handle_warnings[\n 19:48:25 Concurrency: 4 threads (target='target')\n 19:48:25\n 19:48:25 1 of 2 START test dbt_utils_accepted_range_table_col__12__0 ................... [RUN]\n 19:48:25 2 of 2 START test unique_table__uuid .......................................... [RUN]\n 19:48:27 1 of 2 WARN dbt_utils_accepted_range_table_col__12__0 ..................... [WARN in 1.83s]\n 19:48:27 2 of 2 PASS unique_table__uuid ................................................ [PASS in 1.85s]\n 19:48:27\n 19:48:27 Finished running 2 tests, 1 hook in 0 hours 0 minutes and 12.86 seconds (12.86s).\n 19:48:27\n 19:48:27 Completed with 1 warning:\n 19:48:27\n 19:48:27 Warning in test dbt_utils_accepted_range_table_col__12__0 (models/ads/ads.yaml)\n 19:48:27 Got 252 results, configured to warn if >0\n 19:48:27\n 19:48:27 compiled Code at target/compiled/model/models/table/table.yaml/dbt_utils_accepted_range_table_col__12__0.sql\n 19:48:27\n 19:48:27 Done. PASS=1 WARN=1 ERROR=0 SKIP=0 TOTAL=2\n 19:48:27 Command `dbt test` succeeded at 07:50:02.340364 after 43.98 seconds\n 19:48:27 Flushing usage events\n -True] - TypeError: can only concatenate str (not "NoneType") to str FAILED tests/operators/test_kubernetes.py::test_dbt_kubernetes_operator_handle_warnings[\n 19:48:25 Concurrency: 4 threads (target='target')\n 19:48:25\n 19:48:25 1 of 2 START test dbt_utils_accepted_range_table_col__12__0 ................... [RUN]\n 19:48:25 2 of 2 START test unique_table__uuid .......................................... [RUN]\n 19:48:27 1 of 2 PASS 252 dbt_utils_accepted_range_table_col__12__0 ..................... [PASS in 1.83s]\n 19:48:27 2 of 2 PASS unique_table__uuid ................................................ [PASS in 1.85s]\n 19:48:27\n 19:48:27 Finished running 2 tests, 1 hook in 0 hours 0 minutes and 12.86 seconds (12.86s).\n 19:48:27\n 19:48:27 Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2\n -False] - TypeError: can only concatenate str (not "NoneType") to str FAILED tests/operators/test_kubernetes.py::test_dbt_kubernetes_operator_handle_warnings[\n gibberish\n -False] - TypeError: can only concatenate str (not "NoneType") to str ``` The problems seemed to be: - `CosmosRichLogger.handle()` crashed with `TypeError` when `record.msg` was `None` (e.g. logging a `None` return value). Guard against this by checking for None and converting to str. - Add an autouse fixture in `conftest.py` that demotes any `CosmosRichLogger` instances back to `logging.Logger` after each test. This prevents `test_log.py` (which monkeypatches `rich_logging=True`) from permanently polluting the logging cache with `CosmosRichLogger` instances that add the (astronomer-cosmos) prefix to subsequent tests. --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> (cherry picked from commit 92810c4)
The following unit tests have been unstable for a while:
```
FAILED tests/dbt/test_graph.py::test_save_dbt_ls_cache - AssertionError: assert 'd0bfe03ea1d157413f77544931b0d8f5' in
('9d95cbf6529e2ab51fadd6a3f0a3971f', '633a523f295ef0cd496525428815537b')
FAILED tests/dbt/test_graph.py::test_save_yaml_selectors_cache - AssertionError: assert 'd0bfe03ea1d157413f77544931b0d8f5' in ('9d95cbf6529e2ab51fadd6a3f0a3971f',
'633a523f295ef0cd496525428815537b')
```
The reason is that the `tmp_dbt_project_dir` fixture copies the dbt
project to a temp directory for deterministic hashing, but it only
ignores a few artefacts. Local dbt runs create the `dbt_packages`,
`dbt_internal_packages`, `venv`, and `__pycache__` directories, which
change the hash and cause flaky test failures. Add all common dbt
artefacts and local files to the ignore list.
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
(cherry picked from commit 82ec2e5)
**Problem**
When Cosmos is loaded inside Astro Runtime, plugin discovery triggers a
circular import that prevents Cosmos from loading:
```
❯ 2026-04-07T19:17:55.923162Z [error ] Failed to import plugin cosmos [airflow.plugins_manager] loc=plugins_manager.py:261
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/airflow/plugins_manager.py", line 253, in load_entrypoint_plugins
plugin_class = entry_point.load()
^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/importlib/metadata/__init__.py", line 205, in load
module = import_module(match.group('module'))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 999, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "/usr/local/lib/python3.12/site-packages/cosmos/plugin/__init__.py", line 21, in <module>
from .airflow3 import CosmosAF3Plugin as CosmosPlugin # type: ignore[assignment] # noqa: F401
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/cosmos/plugin/airflow3.py", line 22, in <module>
from cosmos import telemetry
File "/usr/local/lib/python3.12/site-packages/cosmos/telemetry.py", line 16, in <module>
from cosmos.log import get_logger
File "/usr/local/lib/python3.12/site-packages/cosmos/log.py", line 5, in <module>
from cosmos.settings import rich_logging
ImportError: cannot import name 'rich_logging' from partially initialized module 'cosmos.settings' (most likely due to a circular import)
(/usr/local/lib/python3.12/site-packages/cosmos/settings.py)
```
**How to reproduce**
```
docker exec -it $(docker ps -q -f name=scheduler) python -c \
"from cosmos.profiles import GoogleCloudServiceAccountDictProfileMapping; \
m = GoogleCloudServiceAccountDictProfileMapping(conn_id='gcp_gs_conn', \
profile_args={'dataset': 'astro_prod_cosmos_test'}); print(m.env_vars)"
```
**Root cause**
The import chain during plugin discovery is:
```
cosmos.plugin.__init__
→ cosmos.plugin.airflow3 (line 26: from cosmos import telemetry)
→ cosmos.telemetry (line 15: from cosmos import constants, settings)
→ cosmos.settings (starts executing, defines module-level variables...)
→ cosmos.constants (imported at line 14 of settings.py — fine)
→ cosmos.log (line 16: from cosmos.log import get_logger)
→ cosmos.settings (line 5: from cosmos.settings import rich_logging)
✗ cosmos.settings is still being initialized — rich_logging not yet defined
```
`cosmos.telemetry` imports both `cosmos.settings` (line 15) and
`cosmos.log` (line 16) at the module level. When `cosmos.settings` is
imported, it starts executing but hasn't reached the `rich_logging = ...
`assignment (line 35) before `cosmos.log` tries to `import rich_logging`
from it.
In standard Airflow, this doesn't manifest because the import order
completes `cosmos.settings` before `cosmos.log` is needed. In Astro
Runtime, there is probably some specific code that triggers plugin
discovery in a different order than the one that exposes the cycle.
**Fix**
Move the from `cosmos.settings import rich_logging` import from module
level in `cosmos/log.py` into the `get_logger()` function body.
**Validation**
I validated by cloning and changing:
https://github.com/astronomer/cosmos-demo
So that the `requirements.txt` installs Cosmos using:
```
astronomer-cosmos @ https://github.com/astronomer/astronomer-cosmos/archive/refs/heads/fix-astro-runtime-circular-import.zip
```
---------
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
(cherry picked from commit bfe69d4)
Update mirrors-mypy from v1.19.1 to v1.20.0 in pre-commit config. Change type: ignore error codes from [attr-defined] to [union-attr] in cosmos/airflow/graph.py. mypy 1.20.0 now correctly infers the union type DAGNode | Any and reports union-attr instead of attr-defined for attribute access on union members. related: #2545 Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> (cherry picked from commit e3251af)
TestBehavior.AFTER_EACH is now fully supported in ExecutionMode.WATCHER since 1.14.0 via DbtTestWatcherOperator. Update the docs to reflect this, preserve historical context about the EmptyOperator placeholders in prior versions, and clarify that WATCHER_KUBERNETES still uses placeholders. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> (cherry picked from commit 67ec5f6)
In DBT_RUNNER mode, run_dbt_runner appended a new XCom-pushing callback to _dbt_runner_callbacks on every invocation. When source rendering is enabled the producer runs dbt source freshness first (via _run_source_freshness), registering a freshness callback, then runs dbt build which appended a second callback. Both callbacks fired for every event during the build, printing each log line twice. Fix: skip callback registration when context["_check_source_freshness"] is True. The freshness run's results are captured from target/sources.json afterwards and do not need per-event XCom pushes. Raw JSON stdout is still suppressed via _NullWriter for both paths. Logs Before changes <img width="1713" height="968" alt="Screenshot 2026-04-16 at 4 18 00 PM" src="https://github.com/user-attachments/assets/ac24003d-97b6-4164-b659-177c01b779b1" /> Logs After changes <img width="1680" height="1009" alt="Screenshot 2026-04-16 at 4 18 55 PM" src="https://github.com/user-attachments/assets/3b5fcb31-ff34-4059-93a8-674c5520c1bb" /> closes: #2558 --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> (cherry picked from commit d4f9385)
The DFS previously marked every transitive dependent of a stale source as skip, regardless of whether the node had additional clean upstream paths that would allow it to succeed. For example, a model that depends on both a stale source and a clean upstream model was pre-emptively excluded even though dbt would have run it successfully (especially for warn-status sources where dbt continues execution normally). A node is now added to the skip set only when all of its depends_on entries are either known-stale sources or already-skipped nodes. Nodes with at least one clean upstream path are left out of the skip set and allowed to run. The DFS still re-evaluates each candidate when a new node joins the skip set, so purely stale chains are handled correctly. closes: #2536 --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> (cherry picked from commit f2de9af)
This PR enables inlets and outlets on Airflow 3 using DBT Fusion when using `DbtRunOperationLocalOperator`, by letting `DbtLocalArtifactProcessor()` find `--profiles-dir` by passing `dbt_command_line` to`calculate_openlineage_events_completes` as dbt-fusion does not write `profiles_dir` into run_results.json. Please see #2560 for more details about the issue. (cherry picked from commit 6ba5403)
Python does not enforce type annotations at runtime, so passing None for
source_rendering_behavior bypasses the SourceRenderingBehavior.NONE
default, causing sources to be rendered incorrectly and an
AttributeError in converter.py when .value is called on None.
Add a guard in RenderConfig.__post_init__ that issues a UserWarning and
normalizes None to SourceRenderingBehavior.NONE, consistent with the
existing deprecation warning pattern in __post_init__.
DAG
```python
example_watcher = DbtDag(
# dbt/cosmos-specific parameters
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.WATCHER,
),
project_config=ProjectConfig(jaffle_shop_path),
render_config=RenderConfig(source_rendering_behavior=None),
profile_config=default_profile,
# normal dag parameters
schedule="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="example_watcher",
default_args={"retries": 0},
operator_args={
"dbt_cmd_flags": ["--log-level", "debug"],
},
)
```
Before change DAG topology
<img width="1689" height="992" alt="Screenshot 2026-04-17 at 1 42 49 AM"
src="https://github.com/user-attachments/assets/0e828278-6aa6-4acd-8711-b926b72e13d7"
/>
After Change DAG topology
<img width="1680" height="1001" alt="Screenshot 2026-04-17 at 1 41
10 AM"
src="https://github.com/user-attachments/assets/7bff4615-5a90-48bc-94ed-a95129a1f322"
/>
closes: #2568
(cherry picked from commit 37fd06c)
The Sphinx docs build had outgrown the memory available on pre-commit.ci runners and was failing consistently on every pull request, including ones that touched no documentation at all. The result was a red pre-commit.ci check on unrelated PRs, blocked merges, time lost to investigating failures that had nothing to do with the change under review, and an erosion of trust in required checks. Running the docs build on GitHub Actions restores that trust. PRs that do not touch docs or library sources now pass cleanly. PRs that do touch them get a reliable pass/fail signal with enough memory to actually complete the build, so warnings surface where they belong rather than being masked by OOM failures. pre-commit.ci is left to focus on what it is good at, fast lint and format checks, and reviewers and contributors get back the signal they rely on to ship changes confidently. related: #2578 related: #2579 --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> (cherry picked from commit 49b23f9)
Use a version range `[">=1.1.1", "<1.4.0"]` for `dbt_utils` in the jaffle_shop test project instead of pinning to `1.1.1` because `dbt_utils@1.1.1` is incompatible with dbt 2.0, causing dbt Fusion integration tests to fail with `dbt1080` warnings and exit code 1. The range allows dbt to resolve to `1.3.3` (which supports `>=1.3.0, <3.0.0`) for dbt 2.0 while still working with older dbt versions. (cherry picked from commit c0f8b80)
…ing entirely (#2577) Skip malformed selectors with a warning instead of raising an error, so that valid selectors in the same manifest still work. I reproduced the bug outlined in the related issue and verified these changes will resolve the problem. Closes #2565 (cherry picked from commit d13ed08)
…#2573) - Wraps `Variable.set()` in `save_dbt_ls_cache()` and `save_yaml_selectors_cache()` with a `try/except` that catches `AirflowRuntimeError` (Airflow 3 Task SDK only) - On failure, logs a `WARNING` with the variable key and guidance to set `AIRFLOW__COSMOS__REMOTE_CACHE_DIR` instead of propagating the error - On Airflow 2 the existing direct-DB path is unchanged (no exception expected) Closes #2551 Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> (cherry picked from commit fa735d2)
The producer retry behaviour change in Cosmos 1.14.0 had issues, as detailed in #2430: - Consumer tasks were incorrectly marked as successful even when one of their models failed in the producer, because they only captured the producer status after it retried and "succeeded" (behaving as an empty operator on the second try) - The producer was marked as successful on retry, while the correct behaviour is to skip it - Producer XCom values were deleted during retries (standard Airflow behaviour), leading to incorrect status on consumer tasks Affted both: - `ExecutionMode.WATCHER` - `ExecutionMode.WATCHER_KUBERNETES` **Behaviour before and after** Cosmos 1.14.0 - DAG `dev/failed_dags/example_watcher_failing_tests.py` https://github.com/user-attachments/assets/9d02fbf4-d4d7-47c8-860e-6d172acd2ded Cosmos 1.14.1a2 (cut from this branch) - DAG `dev/failed_dags/example_watcher_failing_tests.py` https://github.com/user-attachments/assets/0eb4e055-7242-459b-a3a2-3f84864d41c6 **Main changes** (1) Producer retry behaviour Fix overall behaviour introduced in Cosmos 1.14.0: - Producer raises `AirflowSkipException` on retry instead of re-running the dbt build - Consumer sensors detect the skipped producer and fall back to running dbt individually via `_fallback_to_non_watcher_run ` (3) XCom backup/restore for retry resilience Overcome a limitation of Airflow in refreshing the `XCom` between DAG retrues. This is a workaround. On the mid-long term, this may be addressed as part of [AIP-103](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-103%3A+Task+State+Management) after Airflow 3.2 (the current latest version). - Added incremental XCom backup: `each safe_xcom_push` accumulates key/value pairs in a buffer on the task instance, persisted to an Airflow Variable on each push - On failure, the backup Variable is preserved so the retry can restore it - On success, the backup Variable is deleted to prevent stale data from accumulating - On retry (try_number > 1), XCom entries are restored from the Variable before the producer raises `AirflowSkipException` - Uses the Airflow Variable API (not direct DB queries) for Airflow 2 and 3 compatibility An alternative to updating the backup variable per model update would be to do so by the end of the execution. The one problem with this approach is that it would not work for tasks killed by OOM or -9 from the OSS, but it would result in fewer Airflow metadata database calls. **How to reproduce** Run the following DAGs referencing the dbt project `watcher_failing_tests` that was added to Cosmos in this PR: - `example_watcher_failing_tests.py` - watcher + dbt_runner (default) - `example_watcher_failing_tests_subprocess.py` - watcher + subprocess - `example_watcher_failing_multi_folder.py` - watcher + group_nodes_by_folder - `example_watcher_failing_tests_kubernetes.py` - watcher_kubernetes - `example_watcher_failing_tests_non_deferrable.py` - watcher + dbt_runner + non-deferrable All reference dbt projects in dev/dags/dbt/. I updated `scripts/test/kubernetes-setup.sh` to create the Docker image with the failing dbt project and to upload it to kind. To use with `airflow standalone`, set `AIRFLOW__CORE__DAGS_FOLDER` to `dev/failed_dags/`. **Pending work** The following will be accomplished outside of the scope of this PR: - Implement an approach to delete stale variables that backed up XCom values across DAG run retries - Collect user feedback Closes #2430 (cherry picked from commit 6ff38a0)
The `pre_condition` task group in `cosmos_manifest_selectors_example`
used `select=["+customers"]`, which left the DAG dependent on state
leaked from other tests. This made the integration test
`test_example_dag[cosmos_manifest_selectors_example]` flaky; passing
only when `pytest-split` happened to order another jaffle_shop DAG
before it in the same split (which pre-populated the required tables in
Postgres).
**Root cause**
Two gaps in the +customers selection:
1. Orphan seeds. In `altered_jaffle_shop`, `stg_orders` and
`stg_payments` read their data via `source('postgres_db', 'raw_orders' |
'raw_payments')`. The corresponding seeds (`raw_orders`,
`raw_payments`) are orphans in the manifest, nothing references them, so
`+` traversal skips them, and they never get loaded. `raw_customers` is
pulled in because each staging model has a `force_seed_dep CTE that does
select * from {{ ref('raw_customers') }}`.
2. Missing `orders` model. The
`relationships_orders_customer_id__customer_id__ref_customers_ test` is
attached to both `customers` and `orders` and queries `public.orders`.
`+customers` pulls the test in (it's a child of customers) but doesn't
build the orders model, so `customers.test` fails with `relation
public.orders" does not exist`. This also matters because the downstream
`local_example` / `aws_s3_example` / `gcp_gs_example` /
`azure_abfs_example` task groups all run the critical_path selector,
which is the union of `customers` and `orders`, so pre_condition needs
to leave both models present.
**Fix**
Change the pre_condition selector to:
`select=["+customers", "+orders", "raw_orders", "raw_payments"]`
- `+customers` / `+orders` build both final models and their upstream
`stg_*` models (and pull in `raw_customers` via `ref`)
- `raw_orders`, `raw_payments` explicitly seed the two orphan seeds so
the `source()` reads in `stg_orders` / `stg_payments` resolve
related: #2562
related: #2592
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
(cherry picked from commit e9a43fc)
Currently, `pre-commit-ci[bot]` opens PR suggestions on GitHub, and we would like to disable them because it does not respect Astronomer's 7-day cooldown policy. We recently updated Dependabot to manage pre-commit upgrades, so we can safely disable this other configuration. Related: #2530 Related: #2530 Unfortunately, it seems that, to date, pre-commit[ci] does not offer [granularity to disable only the autoupdate PRs](pre-commit/pre-commit#2670). It is also [not possible to skip it](ddf8375), since it is not a pre-commit job. The alternative to the change in this PR would be to remove `pre-commit[ci]` as a GitHub app altogether. This would, however, have the downside of not running pre-commit checks in the CI as we do now, which are quite useful. Therefore, we decided to reduce the frequency of how often this happens - for now - until we have a better solution. Also fixes failing `pre-commit` check: ``` ❯ Just pulled the latest changes from main, getting: ruff check....................................................................Failed - hook id: ruff-check - exit code: 1 F821 Undefined name `cosmos` --> tests/test_log.py:45:25 | 43 | def test_rich_logging_none_message(monkeypatch, caplog): 44 | """CosmosRichLogger should not crash when record.msg is None.""" 45 | monkeypatch.setattr(cosmos.log, "rich_logging", True) | ^^^^^^ 46 | logger = get_logger("test-none-msg") 47 | with caplog.at_level("INFO"): | ``` https://results.pre-commit.ci/run/github/577757880/1775638428.KyFU_uV_Q-id7x3hn2tGVg (cherry picked from commit 513f0ed)
Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
|
@michal-mrazek @johnhoran we know it is short-noticed, but if you'd like to try this out before we cut the stable version, @pankajkoti managed to cut an alpha release with all these changes: Edit: Please don't test this alpha. It had missing cherry-picks for a couple of PRs. Please test the new alpha 1.14.1a4 (https://pypi.org/project/astronomer-cosmos/1.14.1a4/) |
|
Apologies, team @tatiana @pankajastro @michal-mrazek @johnhoran, if you already began testing, but I had to cut a new alpha https://pypi.org/project/astronomer-cosmos/1.14.1a4/ to include a couple of missing cherry-picks because my local main did not have them earlier. Requesting to test the new alpha, please once and if you get a chance: https://pypi.org/project/astronomer-cosmos/1.14.1a4/ |
The cherry-pick for #2592 was reverted, so drop its entry from the Bug Fixes section to keep the changelog consistent with the branch state. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…API (#2547) - Cache Airflow's `InProcessExecutionAPI` across task executions in `dag.test()` to eliminate per-task FastAPI app creation overhead - Reduces Airflow **3.1** integration tests from **~47 min** to **~30 min** and Airflow **3.2** from **~56 min** to **~30 min** e.g. run: https://github.com/astronomer/astronomer-cosmos/actions/runs/24193932640/job/70618711111?pr=2547 ## Problem Integration tests on Airflow 3.1+ are 5-6x slower than on 2.x. Profiling CI runs showed (e.g. https://github.com/astronomer/astronomer-cosmos/actions/runs/24133899134/job/70416991954): | Airflow | Avg Duration | |---------|-------------| | 2.9 | 9 min | | 2.10 | 10 min | | 2.11 | 20 min | | 3.0 | 24 min | | 3.1 | 47 min | | 3.2 | 56 min | ## Root cause Airflow 3.1+'s `dag.test()` creates a new `InProcessExecutionAPI` for every task via `InProcessTestSupervisor._api_client()`. Each instantiation spins up a full FastAPI application with ASGI middleware, JWT auth, dependency injection, and an async event loop — adding ~6-8s of overhead per task. For a 13-task DAG like `basic_cosmos_dag`, this accumulates to ~80s (vs ~2.5s on Airflow 2.10). ## Fix Add a session-scoped pytest fixture that patches `in_process_api_server()` to return a cached `InProcessExecutionAPI` instance, so the FastAPI app is created once and reused across all tasks and tests. The fixture is a no-op on Airflow versions before 3.1. ## Test plan - [x] Verify Airflow 3.1 and 3.2 integration tests pass and run faster (~30 min vs ~50 min) - [x] Verify Airflow 3.0 integration tests pass (fixture is a no-op) - [x] Verify Airflow 2.x integration tests are unaffected Testing job run: https://github.com/astronomer/astronomer-cosmos/actions/runs/24193932640/job/70618711111?pr=2547 related: #2302 --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> (cherry picked from commit 4ca2d20)
Previously, any retry of a watcher consumer sensor (try_number > 1) unconditionally fell back to a non-watcher dbt run. This meant that if the sensor timed out while the producer was still working, the retry would launch a duplicate dbt invocation instead of continuing to wait for the producer's XCom updates. On retry, first check the producer task state. If the producer has reached a terminal state (success, failed, skipped, upstream_failed, removed), fall back to a non-watcher run as before — this covers both producer completion and manual task clears from the UI. Otherwise, keep polling so the sensor can pick up where it left off without spawning a redundant run. closes: astronomer/oss-integrations-private#359 --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> (cherry picked from commit 8a9e1c9)
## Summary - Extend the `skipif` on `test_dbt_task_group_watcher_gateway_prevents_downstream_skip` to also cover Airflow 3.0. - Unblocks main, which started failing in the `Run-Integration-Tests (3.10/3.11/3.12, 3.0, 1.11, 2)` jobs right after #2597 merged. This PR is scoped purely to a test skip to restore CI. It does not change production code. ## Why the test broke on main The test was added in #2597. Its PR CI was green because #2597's head was branched off main *before* #2592 merged — the two changes were merged together without a re-run, so main is the first place they run as a pair. On Airflow 3.0, `dag.test()` runs tasks inline via `ti._run_raw_task` without starting the task SDK supervisor, so `SUPERVISOR_COMMS` is never initialised. `RuntimeTaskInstance.get_task_states` raises `NameError`, and `fetch_state_airflow3` in `cosmos/operators/_watcher/state.py` returns `None`. Under the retry behaviour introduced in #2592, an unknown producer state makes the consumer sensor keep polling instead of falling back to a local dbt run. The sensor re-reads the stale `"error"` status from the producer's first attempt and exhausts retries, so the DAG finishes `failed`. Airflow 3.2 rebuilt `dag.test()` on top of `run_task_in_process`, which sets up an in-process supervisor, so the same code path works there. Airflow 3.1.x is already skipped because of the unrelated `SetRenderedFields` retry crash. ## Follow-up (not in this PR) A more durable fix would be to make `fetch_state_airflow3` fall back to a direct DB query when `SUPERVISOR_COMMS` is unavailable, mirroring the Airflow 2 path. That would let this test run on Airflow 3.0 again. It is out of scope here. ## Test plan - [x] `hatch run tests.py3.11-3.0-1.9:pytest -v -m integration tests/operators/test_watcher.py::test_dbt_task_group_watcher_gateway_prevents_downstream_skip` → SKIPPED locally - [ ] CI green on Airflow 2.10, 2.11, 3.0, 3.1, 3.2 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> (cherry picked from commit 238bc55)
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## release-1.14 #2601 +/- ##
===============================================
Coverage ? 98.09%
===============================================
Files ? 104
Lines ? 7647
Branches ? 0
===============================================
Hits ? 7501
Misses ? 146
Partials ? 0 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
tatiana
left a comment
There was a problem hiding this comment.
Excellent work on getting this release out, @pankajkoti , thanks a lot!
Adds Changelogs and version bump for Cosmos 1.14.1 to the main branch. related: astronomer/oss-integrations-private#381 related: #2601 --------- Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Bug Fixes
ExecutionMode.WATCHERproducer retry behaviour by @tatiana in Fix watcher producer retries behaviour #2559CosmosRichLoggercrash onNonelog message by @tatiana in Fix unit testsCosmosRichLoggercrash onNonemsg and test pollution #2540ExecutionMode.WATCHERby @pankajastro in Fix incorrectly skipped source downstream tasks in the watcher #2563dbt buildwhen source freshness is enabled by @pankajastro in Fix duplicate logs in dbt build when source freshness is enabled #2564source_rendering_behavior=Noneis passed by @pankajastro in Warn and normalize when source_rendering_behavior=None is passed #2570Variable.set()failures on Astro Remote Execution by @hkc-8010 in Gracefully handleVariable.set()failures on Astro Remote Execution #2573Docs
ExecutionMode.WATCHERanddepends_on_pastlimitation by @tatiana in Add docs related toExecutionMode.WATCHERanddepends_on_pastlimitation #2602Others
cosmos_manifest_selectors_exampleDAG in CI by @pankajkoti in Fix flaky cosmos_manifest_selectors_example DAG in CI #2593reviewdog/action-actionlintfrom 1.71.0 to 1.72.0 by @dependabot in Bump reviewdog/action-actionlint from 1.71.0 to 1.72.0 #2542closes: https://github.com/astronomer/oss-integrations-private/issues/381