Speed up Airflow 3.1+ integration tests by caching InProcessExecutionAPI#2547
Conversation
✅ Deploy Preview for astronomer-cosmos canceled.
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2547 +/- ##
=======================================
Coverage 98.08% 98.08%
=======================================
Files 103 103
Lines 7484 7484
=======================================
Hits 7341 7341
Misses 143 143 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR speeds up Airflow 3.1+ integration tests by avoiding repeated creation of Airflow’s InProcessExecutionAPI during dag.test() runs, by caching and reusing the FastAPI-backed in-process execution API across the test session.
Changes:
- Add a session-scoped, autouse pytest fixture to patch Airflow’s
in_process_api_server()to return a cached API instance. - Make the patch a no-op on older Airflow versions (via version checks / safe import+attribute handling).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Airflow 3.1+ requires DAGs to be serialized to the database before dag.test() can create a DagRun. Previously, every single test created a fresh DagBundleModel, instantiated a new DagBag, and called sync_bag_to_db individually — adding significant per-test overhead that caused integration tests on 3.1/3.2 to take 5-6x longer than on 2.9. Cache DagBundle creation and track synced DAG IDs at module level so each DAG is synced at most once per session. Add a batch pre-sync fixture in test_example_dags.py that syncs all ~31 example DAGs in a single call at module start, letting individual parametrized tests skip the sync entirely. Also add invalidate_dag_sync_cache() for tests that explicitly delete DAG metadata records. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Airflow 3.x's dag.test() creates a new InProcessExecutionAPI for every task — a full FastAPI app with ASGI middleware, JWT auth, and async event loop. For a 13-task DAG this adds ~80s of overhead (~6-8s per task), explaining why test_example_dag[basic_cosmos_dag] takes 81s on Airflow 3.2 vs 2.5s on 2.10. Add a session-scoped conftest fixture that patches in_process_api_server() to return a cached instance, so the FastAPI app is created once and reused across all tasks and tests. This is the primary bottleneck; the sync caching from the previous commit provides a secondary improvement. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Airflow 3.0 has the supervisor module but not the in_process_api_server function (added in 3.1). Catch AttributeError alongside ImportError to make the caching fixture a no-op on 3.0. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This reverts commit a3f1a04.
…n guard Tighten version guard from 3.0 to 3.1 since in_process_api_server only exists in Airflow 3.1+, avoiding unnecessary import/exception work on 3.0. Restore the original function in a finally block after yield to prevent leaking the monkeypatch. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
476364c to
322d21d
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
tatiana
left a comment
There was a problem hiding this comment.
Great find, @pankajkoti , thanks a lot for this improvement!
## Summary Use pytest-split to distribute integration tests into 3 groups that run as separate GitHub Actions matrix jobs. Each group gets its own Postgres container, so there are no shared-state conflicts. **Changes:** - Add `split-group: [1, 2, 3]` dimension to `Run-Integration-Tests` matrix - Pass `PYTEST_SPLITS`/`PYTEST_SPLIT_GROUP` env vars through to pytest - Update coverage artifact names to include split group - Add `.test_durations` file with real timings from CI (184 tests, balanced ~390s per group) - `integration.sh` conditionally adds `--splits`/`--group` flags (no-op when env vars are unset, preserving local dev behavior) **Results (bottleneck job wall-clock):** | Before splitting | 2-way split | 3-way split (this PR) | |-----------------|-------------|----------------------| | ~30 min (Airflow 3.1) | ~22 min | ~16 min | **How it works:** - pytest-split reads `.test_durations` and uses the `least_duration` algorithm to bin-pack tests into balanced groups - Each matrix job gets its own GitHub Actions runner and Postgres service container — no shared state - New tests not in `.test_durations` get assigned to the lightest group automatically - The file can be refreshed with real timings via `pytest --store-durations` periodically or when we see the splits are not balanced and some of them are taking longer closes: #2302 related: #2547 --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…API (#2547) - Cache Airflow's `InProcessExecutionAPI` across task executions in `dag.test()` to eliminate per-task FastAPI app creation overhead - Reduces Airflow **3.1** integration tests from **~47 min** to **~30 min** and Airflow **3.2** from **~56 min** to **~30 min** e.g. run: https://github.com/astronomer/astronomer-cosmos/actions/runs/24193932640/job/70618711111?pr=2547 ## Problem Integration tests on Airflow 3.1+ are 5-6x slower than on 2.x. Profiling CI runs showed (e.g. https://github.com/astronomer/astronomer-cosmos/actions/runs/24133899134/job/70416991954): | Airflow | Avg Duration | |---------|-------------| | 2.9 | 9 min | | 2.10 | 10 min | | 2.11 | 20 min | | 3.0 | 24 min | | 3.1 | 47 min | | 3.2 | 56 min | ## Root cause Airflow 3.1+'s `dag.test()` creates a new `InProcessExecutionAPI` for every task via `InProcessTestSupervisor._api_client()`. Each instantiation spins up a full FastAPI application with ASGI middleware, JWT auth, dependency injection, and an async event loop — adding ~6-8s of overhead per task. For a 13-task DAG like `basic_cosmos_dag`, this accumulates to ~80s (vs ~2.5s on Airflow 2.10). ## Fix Add a session-scoped pytest fixture that patches `in_process_api_server()` to return a cached `InProcessExecutionAPI` instance, so the FastAPI app is created once and reused across all tasks and tests. The fixture is a no-op on Airflow versions before 3.1. ## Test plan - [x] Verify Airflow 3.1 and 3.2 integration tests pass and run faster (~30 min vs ~50 min) - [x] Verify Airflow 3.0 integration tests pass (fixture is a no-op) - [x] Verify Airflow 2.x integration tests are unaffected Testing job run: https://github.com/astronomer/astronomer-cosmos/actions/runs/24193932640/job/70618711111?pr=2547 related: #2302 --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> (cherry picked from commit 4ca2d20)
Bug Fixes * Fix ``ExecutionMode.WATCHER`` producer retry behaviour by @tatiana in #2559 * Prevent watcher producer skip propagating to downstream tasks via gateway task by @johnhoran and @tatiana in #2597 * Keep watcher sensor polling when producer is still running by @pankajkoti in #2592 * Fix circular import error in Cosmos plugin discovery under Astro Runtime by @tatiana in #2538 * Fix ``CosmosRichLogger`` crash on ``None`` log message by @tatiana in #2540 * Enable inlets and outlets using dbt Fusion on Airflow 3 by @ichirotakami in #2561 * Fix incorrectly skipped source downstream tasks in ``ExecutionMode.WATCHER`` by @pankajastro in #2563 * Fix duplicate logs in ``dbt build`` when source freshness is enabled by @pankajastro in #2564 * Warn and normalize when ``source_rendering_behavior=None`` is passed by @pankajastro in #2570 * Gracefully handle ``Variable.set()`` failures on Astro Remote Execution by @hkc-8010 in #2573 * Skip malformed YAML selectors instead of failing entirely by @YourRoyalLinus in #2577 Docs * Update watcher test behavior docs for Cosmos 1.14.0 by @tatiana in #2549 * Add redirect for moved partial-parsing docs page by @tatiana in #2550 * Document ``ExecutionMode.WATCHER`` and ``depends_on_past`` limitation by @tatiana in #2602 * Restore memory-optimised imports docs for Cosmos < 1.14.0 by @pankajkoti in #2604 Others * Speed up Airflow 3.1+ integration tests by caching InProcessExecutionAPI by @pankajkoti in #2547 * Improve stability of cache hash unit tests by @tatiana in #2539 * Fix mypy 1.20.0 type check failures by @pankajkoti in #2546 * Fix CI failures caused by docs build memory exhaustion by @pankajkoti in #2580 * Fix dbt Fusion broken integration tests by @tatiana in #2581 * Fix flaky ``cosmos_manifest_selectors_example`` DAG in CI by @pankajkoti in #2593 * Reduce pre-commit autoupdate frequency PRs by @tatiana in #2544 * Bump ``reviewdog/action-actionlint`` from 1.71.0 to 1.72.0 by @dependabot in #2542 * Skip watcher gateway test on Airflow 3.0 by @tatiana in #2607 closes: astronomer/oss-integrations-private#381
Summary
InProcessExecutionAPIacross task executions indag.test()to eliminate per-task FastAPI app creation overheade.g. run: https://github.com/astronomer/astronomer-cosmos/actions/runs/24193932640/job/70618711111?pr=2547
Problem
Integration tests on Airflow 3.1+ are 5-6x slower than on 2.x. Profiling CI
runs showed (e.g. https://github.com/astronomer/astronomer-cosmos/actions/runs/24133899134/job/70416991954):
Root cause
Airflow 3.1+'s
dag.test()creates a newInProcessExecutionAPIfor everytask via
InProcessTestSupervisor._api_client(). Each instantiation spins upa full FastAPI application with ASGI middleware, JWT auth, dependency injection,
and an async event loop — adding ~6-8s of overhead per task. For a 13-task DAG
like
basic_cosmos_dag, this accumulates to ~80s (vs ~2.5s on Airflow 2.10).Fix
Add a session-scoped pytest fixture that patches
in_process_api_server()toreturn a cached
InProcessExecutionAPIinstance, so the FastAPI app is createdonce and reused across all tasks and tests. The fixture is a no-op on Airflow
versions before 3.1.
Test plan
Testing job run: https://github.com/astronomer/astronomer-cosmos/actions/runs/24193932640/job/70618711111?pr=2547
related: #2302