Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,45 @@
from cosmos.log import CosmosRichLogger


@pytest.fixture(autouse=True, scope="session")
def _cache_airflow_in_process_api():
"""Cache the InProcessExecutionAPI to avoid per-task FastAPI app creation in dag.test().

Airflow 3.x's dag.test() creates a new InProcessExecutionAPI — a full FastAPI
application with ASGI middleware, JWT auth, dependency injection, and an async
event loop — for every single task. This adds ~6-8s of overhead per task,
making a 13-task DAG take ~80s instead of ~2s.

Comment thread
pankajkoti marked this conversation as resolved.
This fixture patches in_process_api_server() to return a cached instance,
so the FastAPI app is created once and reused across all tasks and tests.
"""
if AIRFLOW_VERSION < Version("3.1"):
yield
return

Comment thread
pankajkoti marked this conversation as resolved.
try:
from airflow.sdk.execution_time import supervisor as supervisor_module

_original_fn = supervisor_module.in_process_api_server
except (ImportError, AttributeError):
yield
return

_cached_api = None

def cached_in_process_api_server():
nonlocal _cached_api
if _cached_api is None:
_cached_api = _original_fn()
return _cached_api

supervisor_module.in_process_api_server = cached_in_process_api_server
try:
yield
finally:
supervisor_module.in_process_api_server = _original_fn


@pytest.fixture(autouse=True)
def _cleanup_rich_loggers():
"""Replace any CosmosRichLogger instances with standard loggers after each test.
Expand Down
Loading