Skip to content

Cache InProcessExecutionAPI in dag.test() to avoid per-task FastAPI app creation#65235

Merged
kaxil merged 1 commit into
apache:mainfrom
astronomer:cache-in-process-api-dagtest
Apr 14, 2026
Merged

Cache InProcessExecutionAPI in dag.test() to avoid per-task FastAPI app creation#65235
kaxil merged 1 commit into
apache:mainfrom
astronomer:cache-in-process-api-dagtest

Conversation

@kaxil
Copy link
Copy Markdown
Member

@kaxil kaxil commented Apr 14, 2026

Summary

dag.test() creates a new InProcessExecutionAPI for every task execution. Each instance spins up a full FastAPI app with Cadwyn versioning, plus an a2wsgi ASGI-to-WSGI bridge that starts its own event loop thread. The dominant cost is the cold event loop thread startup and DB connection establishment per HTTP request.

Add @functools.lru_cache(maxsize=1) to in_process_api_server() so the FastAPI app and warm event loop are reused across all tasks within a dag.test() call. DagFileProcessorManager already uses this caching pattern (manager.py:269).

Also clean up stale dependency_overrides when _api_client(dag=None) is called (e.g. from run_trigger_in_process), and add a cache_clear fixture + caching contract test.

Benchmarks (breeze, SQLite)

Per-task overhead (profiled, 10-task noop DAG):

  • Uncached: 1.25s/task -- Cached: 0.20s/task -- 6x within a single dag.test() call

Test suite improvement (avg of 3 runs, cache cleared between tests):

Test suite Without cache With cache Improvement
test_dag.py (4 dag_test tests) 27.90s 21.58s 23% faster
test_mappedoperator.py (3 dag tests) 26.64s 17.04s 36% faster

The test suite improvement is smaller because fixture setup/teardown and cache_clear() between tests dominate. For downstream projects like Cosmos that run multi-task DAGs with a session-scoped cache, the per-task speedup applies directly -- consistent with their reported 47min->30min improvement.

Bottleneck breakdown (5-task profiled run, uncached):

  • _thread.lock.acquire (a2wsgi sync bridge): 91%
  • _api_client() (FastAPI+Cadwyn creation): 7%
  • Other (task execution, DB, logging): 2%

Why this is safe

  • dependency_overrides cleanup: _api_client(dag=None) now pops stale overrides from the cached instance
  • Single-threaded context: dag.test() runs tasks sequentially
  • No mocks to bypass: No test mocks in_process_api_server()
  • cache_clear fixture: Autouse in airflow-core/tests/conftest.py prevents cross-test state leakage
  • 303 tests pass: test_supervisor (123), test_dagrun (154), test_dag_command (12), test_dag (4), test_mappedoperator (3), test_xcom_arg (10)

Downstream impact

Cosmos CI tests (astronomer/astronomer-cosmos#2547) worked around this with a session-scoped pytest fixture that monkeypatches in_process_api_server(). This fix makes that workaround unnecessary.

@kaxil kaxil requested review from amoghrajesh and ashb as code owners April 14, 2026 16:56
@kaxil kaxil added the full tests needed We need to run full set of tests for this PR to merge label Apr 14, 2026
@kaxil kaxil requested a review from Copilot April 14, 2026 16:56
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves dag.test() performance by reusing a single InProcessExecutionAPI instance instead of creating a new FastAPI/a2wsgi stack per task execution.

Changes:

  • Cache in_process_api_server() using functools.lru_cache(maxsize=1) to reuse the in-process Execution API server across task executions.
  • Ensure stale dependency_overrides for dag_bag_from_app are removed when _api_client(dag=None) is used.
  • Add tests/fixtures to validate and manage the caching contract (cache_clear() usage).

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
task-sdk/src/airflow/sdk/execution_time/supervisor.py Caches in_process_api_server() and clears dag_bag_from_app dependency override when no DAG is provided.
task-sdk/tests/task_sdk/execution_time/test_supervisor.py Adds a unit test asserting the caching/clearing behavior of in_process_api_server().
airflow-core/tests/conftest.py Adds an autouse fixture to clear the cached in-process API server after each test to prevent state leakage.

Comment thread airflow-core/tests/conftest.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/supervisor.py
@kaxil kaxil changed the title Cache InProcessExecutionAPI in dag.test() Cache InProcessExecutionAPI in dag.test() to avoid per-task FastAPI app creation Apr 14, 2026
dag.test() creates a new InProcessExecutionAPI for every task
execution. Each instance spins up a new FastAPI app, a2wsgi ASGI-to-WSGI
bridge with its own event loop thread, and DB connections. Profiling
shows the cold event loop thread dominates: 1.25s/task uncached vs
0.20s/task cached (6.2x speedup for a 10-task DAG).

Add @functools.lru_cache(maxsize=1) to in_process_api_server() so
the FastAPI app and warm event loop are reused across all tasks.
DagFileProcessorManager already uses this pattern (manager.py:269).

Also clean up stale dependency_overrides when _api_client(dag=None)
is called (e.g. from run_trigger_in_process), add a cache_clear
fixture in conftest.py, and add a test for the caching contract.
@kaxil kaxil force-pushed the cache-in-process-api-dagtest branch from 30fa0cc to 2445be7 Compare April 14, 2026 19:13
@kaxil kaxil changed the title Cache InProcessExecutionAPI in dag.test() to avoid per-task FastAPI app creation Cache InProcessExecutionAPI in dag.test() to avoid per-task FastAPI app creation Apr 14, 2026
Copy link
Copy Markdown
Member

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for optimising this piece!

@kaxil
Copy link
Copy Markdown
Member Author

kaxil commented Apr 14, 2026

Static check failure is unrelated

@kaxil kaxil merged commit c7a6eb2 into apache:main Apr 14, 2026
138 of 139 checks passed
@kaxil kaxil deleted the cache-in-process-api-dagtest branch April 14, 2026 20:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-sdk full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants