From bd9f20e313e8e3786d489f8cc5af9ed601ab4e10 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 31 Oct 2025 18:46:25 +0530 Subject: [PATCH 1/2] Remove usage of contextmanager in plugins for accessing connections in Airflow >= 3.1.2 --- cosmos/plugin/airflow3.py | 48 ++++++++++---- docs/airflow3_compatibility/index.rst | 4 ++ docs/configuration/cosmos-conf.rst | 2 +- docs/configuration/hosting-docs.rst | 4 ++ tests/plugin/test_plugin_af3.py | 90 +++++++++++++++++++++++++++ 5 files changed, 134 insertions(+), 14 deletions(-) diff --git a/cosmos/plugin/airflow3.py b/cosmos/plugin/airflow3.py index ba0467dfbe..f130f075ec 100644 --- a/cosmos/plugin/airflow3.py +++ b/cosmos/plugin/airflow3.py @@ -10,23 +10,34 @@ from unittest.mock import patch from urllib.parse import urlsplit +import airflow from airflow.configuration import conf from airflow.plugins_manager import AirflowPlugin from airflow.sdk import ObjectStoragePath from fastapi import FastAPI from fastapi.responses import HTMLResponse, JSONResponse, Response +from packaging.version import Version from cosmos.constants import AIRFLOW_OBJECT_STORAGE_PATH_URL_SCHEMES from cosmos.plugin.snippets import IFRAME_SCRIPT +# Airflow version gating: External views feature for the plugins used here (CosmosAF3Plugin) exist only in >= 3.1 +AIRFLOW_VERSION = Version(airflow.__version__) + + +def ensure_airflow_version_supported() -> None: + if AIRFLOW_VERSION < Version("3.1.0"): + raise RuntimeError( + "Cosmos AF3 plugin requires Airflow >= 3.1. External views are unavailable on earlier versions." + ) + + API_BASE = conf.get("api", "base_url", fallback="") # reads AIRFLOW__API__BASE_URL API_BASE_PATH = urlsplit(API_BASE).path.rstrip("/") -# TODO: This context manager and its usage in the method "_read_content_via_object_storage" has been added due -# to the current limitation in Airflow 3.1.0 where plugins are not able to resolve connections via the API server. -# Once this is fixed, potentially in PR https://github.com/apache/airflow/pull/56602 planned to be released in -# Airflow 3.1.1, test the fix and remove this context manager and its usage. +# Note: Airflow 3.1.0 and 3.1.1 had a limitation where plugins could not resolve connections via the API server. +# The fix was shipped in Airflow 3.1.2. For 3.1.0/3.1.1 we temporarily expose the connection via env vars inside a context manager. @contextmanager def connection_env(conn_id: str | None = None) -> Generator[None, None, None]: # pragma: no cover """ @@ -48,7 +59,14 @@ def connection_env(conn_id: str | None = None) -> Generator[None, None, None]: def _read_content_via_object_storage(path: str, conn_id: str | None = None) -> Any: - with connection_env(conn_id): + # Use connection_env only for Airflow 3.1.0 and 3.1.1 + if Version("3.1.0") <= AIRFLOW_VERSION < Version("3.1.2"): + with connection_env(conn_id): + p = ObjectStoragePath(path, conn_id=conn_id) if conn_id else ObjectStoragePath(path) + with p.open("r") as f: # type: ignore[no-untyped-call] + content = f.read() # type: ignore[no-any-return] + return content + else: p = ObjectStoragePath(path, conn_id=conn_id) if conn_id else ObjectStoragePath(path) with p.open("r") as f: # type: ignore[no-untyped-call] content = f.read() # type: ignore[no-any-return] @@ -102,6 +120,7 @@ def _load_projects_from_conf() -> dict[str, dict[str, Optional[str]]]: def create_cosmos_fastapi_app() -> FastAPI: # noqa: C901 + ensure_airflow_version_supported() app = FastAPI() projects = _load_projects_from_conf() @@ -237,20 +256,23 @@ def catalog(slug_alias: str = slug) -> Response: # type: ignore[no-redef] class CosmosAF3Plugin(AirflowPlugin): name = "cosmos" - # Mount our FastAPI sub-app under /cosmos - fastapi_apps = [ - { - "name": "cosmos", - "app": create_cosmos_fastapi_app(), - "url_prefix": "/cosmos", - } - ] + # Mount our FastAPI sub-app under /cosmos (initialized in __init__ after version check) + fastapi_apps: list[dict[str, Any]] = [] # Register external views for navigation external_views: list[dict[str, Any]] = [] def __init__(self) -> None: super().__init__() + ensure_airflow_version_supported() + # Initialize FastAPI app only after version support is confirmed + self.fastapi_apps = [ + { + "name": "cosmos", + "app": create_cosmos_fastapi_app(), + "url_prefix": "/cosmos", + } + ] projects = _load_projects_from_conf() for slug, cfg in projects.items(): display_name = cfg.get("name") or f"dbt Docs ({slug})" diff --git a/docs/airflow3_compatibility/index.rst b/docs/airflow3_compatibility/index.rst index 3924b87c81..511387a88b 100644 --- a/docs/airflow3_compatibility/index.rst +++ b/docs/airflow3_compatibility/index.rst @@ -50,6 +50,10 @@ Multiple dbt docs in Airflow 3 UI --------------------------------- There have been significant changes to how plugins work in Airflow 3.x. Cosmos now supports Airflow 3 FastAPI plugins for UI integration and hosting dbt docs via external views. + +.. important:: + The Cosmos Airflow 3 plugin (FastAPI external views) is supported on Airflow >= 3.1 only. + Cosmos registers a FastAPI sub-application at ``/cosmos`` and adds menu entries under **Browse**. Configure one or more projects in ``airflow.cfg`` under the ``[cosmos]`` section: .. code-block:: ini diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 584beaeb30..92d37b76e3 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -65,7 +65,7 @@ This page lists all available Airflow configurations that affect ``astronomer-co .. _dbt_docs_projects: `dbt_docs_projects`_: - (Introduced in Cosmos 1.11.0 and applicable to Airflow 3): JSON mapping configuring one or more dbt docs projects for the Airflow 3 UI plugin. + (Introduced in Cosmos 1.11.0; applicable to Airflow >= 3.1): JSON mapping configuring one or more dbt docs projects for the Airflow 3 UI plugin. Structure: mapping of slug to a dict with keys ``dir`` (required), ``index`` (optional, default ``index.html``), ``name`` (optional, label in the menu), and ``conn_id`` (optional connection to read remote storage). diff --git a/docs/configuration/hosting-docs.rst b/docs/configuration/hosting-docs.rst index cc5d1fa48a..04cf3bc3e9 100644 --- a/docs/configuration/hosting-docs.rst +++ b/docs/configuration/hosting-docs.rst @@ -26,6 +26,10 @@ The dbt docs are available in the Airflow menu under ``Browse``: :align: center For Airflow 3, Cosmos exposes the docs under a FastAPI sub-app and external view entries under **Browse**. + +.. important:: + The Airflow 3 plugin support in Cosmos requires Airflow >= 3.1. + You can configure one or more projects via ``[cosmos].dbt_docs_projects``: .. code-block:: ini diff --git a/tests/plugin/test_plugin_af3.py b/tests/plugin/test_plugin_af3.py index ec25680d6d..4101c23c7f 100644 --- a/tests/plugin/test_plugin_af3.py +++ b/tests/plugin/test_plugin_af3.py @@ -22,6 +22,11 @@ import pytest from fastapi.testclient import TestClient +_skip_pre_31 = pytest.mark.skipif( + version.parse(airflow_version) < version.parse("3.1.0"), + reason="AF3 plugin only supported on Airflow >= 3.1", +) + @pytest.fixture(autouse=True) def _isolate_env(): @@ -39,6 +44,14 @@ def _reload_af3_module(api_base: str | None = None): return af3 +def _reload_af3_with_version(af_ver: str): + import cosmos.plugin.airflow3 as af3 + + with patch.object(af3.airflow, "__version__", af_ver): + importlib.reload(af3) + return af3 + + def _app_with_projects(projects: dict[str, dict[str, str]]): af3 = _reload_af3_module() with patch("cosmos.plugin.airflow3._load_projects_from_conf", return_value=projects): @@ -46,6 +59,7 @@ def _app_with_projects(projects: dict[str, dict[str, str]]): return af3, app +@_skip_pre_31 def test_dbt_docs_view_and_index_local(tmp_path: Path): # Arrange: create local docs files docs_dir = tmp_path / "target" @@ -83,6 +97,7 @@ def test_dbt_docs_view_and_index_local(tmp_path: Path): assert r.json() == {"sources": {}} +@_skip_pre_31 def test_index_missing_file_returns_404(tmp_path: Path): docs_dir = tmp_path / "target" docs_dir.mkdir(parents=True) @@ -96,6 +111,7 @@ def test_index_missing_file_returns_404(tmp_path: Path): assert "index not found" in r.text +@_skip_pre_31 def test_manifest_and_catalog_error_500(tmp_path: Path): docs_dir = tmp_path / "target" docs_dir.mkdir(parents=True) @@ -114,6 +130,7 @@ def test_manifest_and_catalog_error_500(tmp_path: Path): assert "catalog read failed" in r.text +@_skip_pre_31 def test_external_view_href_uses_api_base_path(): # Ensure base path is respected (e.g., Astronomer deployment prefix) _reload_af3_module(api_base="https://host/prefix/") @@ -130,6 +147,7 @@ def test_external_view_href_uses_api_base_path(): assert plugin.external_views[0]["href"].endswith("/cosmos/core/dbt_docs_index.html") +@_skip_pre_31 def test_external_view_href_no_base_path(): _reload_af3_module(api_base="") @@ -143,6 +161,7 @@ def test_external_view_href_no_base_path(): assert plugin.external_views[0]["href"].startswith("/cosmos/") +@_skip_pre_31 def test_not_configured_routes(): af3, app = _app_with_projects({"core": {}}) client = TestClient(app) @@ -221,6 +240,73 @@ def open(self, mode="r", **kwargs): assert af3.open_file("gs://bucket/obj", conn_id=None) == "GCS_CONTENT" +def test_version_gate_raises_on_pre_31(): + af3 = _reload_af3_with_version("3.0.9") + with pytest.raises(RuntimeError, match="requires Airflow >= 3.1"): + af3.ensure_airflow_version_supported() + # Also validate app creation enforces gate + with pytest.raises(RuntimeError): + af3.create_cosmos_fastapi_app() + + +def test_plugin_init_raises_on_pre_31(): + af3 = _reload_af3_with_version("3.0.9") + with patch("cosmos.plugin.airflow3._load_projects_from_conf", return_value={}): + with pytest.raises(RuntimeError): + af3.CosmosAF3Plugin() + + +@pytest.mark.parametrize( + "af_ver, should_use_ctx", + [ + ("3.1.0", True), + ("3.1.1", True), + ("3.1.2", False), + ("3.2.0", False), + ], +) +def test_connection_env_usage_depends_on_version(af_ver: str, should_use_ctx: bool): + af3 = _reload_af3_with_version(af_ver) + + import contextlib + + class _FakeFile: + def __init__(self, data: str): + self._data = data + + def __enter__(self): + return self + + def __exit__(self, *args): + return False + + def read(self): + return self._data + + class _FakePath: + def __init__(self, path: str, conn_id=None): + self.path = path + self.conn_id = conn_id + + def open(self, mode="r", **kwargs): + return _FakeFile("OK") + + ctx_calls = {"count": 0} + + @contextlib.contextmanager + def _ctx(_conn_id=None): # type: ignore[no-redef] + ctx_calls["count"] += 1 + yield + + with patch("cosmos.plugin.airflow3.ObjectStoragePath", _FakePath): + with patch("cosmos.plugin.airflow3.connection_env", _ctx): + # Call the internal helper directly to isolate behavior + out = af3._read_content_via_object_storage("gs://x", conn_id="c1") + assert out == "OK" + + assert (ctx_calls["count"] > 0) == should_use_ctx + + def test_load_projects_from_conf_valid_json(): af3 = _reload_af3_module() @@ -236,6 +322,7 @@ def fake_get(section, key, fallback=None): assert projects["core"]["name"] == "Core" +@_skip_pre_31 def test_index_raises_exception_returns_500(tmp_path: Path): docs_dir = tmp_path / "target" docs_dir.mkdir(parents=True) @@ -249,6 +336,7 @@ def test_index_raises_exception_returns_500(tmp_path: Path): assert "Cosmos dbt docs error" in r.text +@_skip_pre_31 def test_catalog_not_configured_returns_404(): af3, app = _app_with_projects({"core": {}}) client = TestClient(app) @@ -257,6 +345,7 @@ def test_catalog_not_configured_returns_404(): assert r.json()["error"] == "not configured" +@_skip_pre_31 def test_manifest_missing_includes_path_and_connid(tmp_path: Path): docs_dir = tmp_path / "target" docs_dir.mkdir(parents=True) @@ -281,6 +370,7 @@ def test_manifest_missing_includes_path_and_connid(tmp_path: Path): assert body["path"].endswith("/target/manifest.json") +@_skip_pre_31 def test_catalog_missing_includes_path_and_connid(tmp_path: Path): docs_dir = tmp_path / "target" docs_dir.mkdir(parents=True) From 0e5b407a1da03005e6e8cc2078dde47a64778cf6 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 3 Nov 2025 17:41:46 +0530 Subject: [PATCH 2/2] Use context manager only for AF 3.1.0, address review comments --- cosmos/plugin/airflow3.py | 8 ++++---- tests/plugin/test_plugin_af3.py | 24 ++++++++++++------------ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/cosmos/plugin/airflow3.py b/cosmos/plugin/airflow3.py index f130f075ec..91b25d117e 100644 --- a/cosmos/plugin/airflow3.py +++ b/cosmos/plugin/airflow3.py @@ -36,8 +36,8 @@ def ensure_airflow_version_supported() -> None: API_BASE_PATH = urlsplit(API_BASE).path.rstrip("/") -# Note: Airflow 3.1.0 and 3.1.1 had a limitation where plugins could not resolve connections via the API server. -# The fix was shipped in Airflow 3.1.2. For 3.1.0/3.1.1 we temporarily expose the connection via env vars inside a context manager. +# Note: Airflow 3.1.0 had a limitation where plugins could not resolve connections via the API server. +# The fix was shipped in Airflow 3.1.1. For 3.1.0, we temporarily expose the connection via env vars inside a context manager. @contextmanager def connection_env(conn_id: str | None = None) -> Generator[None, None, None]: # pragma: no cover """ @@ -59,8 +59,8 @@ def connection_env(conn_id: str | None = None) -> Generator[None, None, None]: def _read_content_via_object_storage(path: str, conn_id: str | None = None) -> Any: - # Use connection_env only for Airflow 3.1.0 and 3.1.1 - if Version("3.1.0") <= AIRFLOW_VERSION < Version("3.1.2"): + # Use connection_env only for Airflow 3.1.0 + if AIRFLOW_VERSION == Version("3.1.0"): with connection_env(conn_id): p = ObjectStoragePath(path, conn_id=conn_id) if conn_id else ObjectStoragePath(path) with p.open("r") as f: # type: ignore[no-untyped-call] diff --git a/tests/plugin/test_plugin_af3.py b/tests/plugin/test_plugin_af3.py index 4101c23c7f..00c76ca7a6 100644 --- a/tests/plugin/test_plugin_af3.py +++ b/tests/plugin/test_plugin_af3.py @@ -22,7 +22,7 @@ import pytest from fastapi.testclient import TestClient -_skip_pre_31 = pytest.mark.skipif( +skip_pre_airflow_31 = pytest.mark.skipif( version.parse(airflow_version) < version.parse("3.1.0"), reason="AF3 plugin only supported on Airflow >= 3.1", ) @@ -59,7 +59,7 @@ def _app_with_projects(projects: dict[str, dict[str, str]]): return af3, app -@_skip_pre_31 +@skip_pre_airflow_31 def test_dbt_docs_view_and_index_local(tmp_path: Path): # Arrange: create local docs files docs_dir = tmp_path / "target" @@ -97,7 +97,7 @@ def test_dbt_docs_view_and_index_local(tmp_path: Path): assert r.json() == {"sources": {}} -@_skip_pre_31 +@skip_pre_airflow_31 def test_index_missing_file_returns_404(tmp_path: Path): docs_dir = tmp_path / "target" docs_dir.mkdir(parents=True) @@ -111,7 +111,7 @@ def test_index_missing_file_returns_404(tmp_path: Path): assert "index not found" in r.text -@_skip_pre_31 +@skip_pre_airflow_31 def test_manifest_and_catalog_error_500(tmp_path: Path): docs_dir = tmp_path / "target" docs_dir.mkdir(parents=True) @@ -130,7 +130,7 @@ def test_manifest_and_catalog_error_500(tmp_path: Path): assert "catalog read failed" in r.text -@_skip_pre_31 +@skip_pre_airflow_31 def test_external_view_href_uses_api_base_path(): # Ensure base path is respected (e.g., Astronomer deployment prefix) _reload_af3_module(api_base="https://host/prefix/") @@ -147,7 +147,7 @@ def test_external_view_href_uses_api_base_path(): assert plugin.external_views[0]["href"].endswith("/cosmos/core/dbt_docs_index.html") -@_skip_pre_31 +@skip_pre_airflow_31 def test_external_view_href_no_base_path(): _reload_af3_module(api_base="") @@ -161,7 +161,7 @@ def test_external_view_href_no_base_path(): assert plugin.external_views[0]["href"].startswith("/cosmos/") -@_skip_pre_31 +@skip_pre_airflow_31 def test_not_configured_routes(): af3, app = _app_with_projects({"core": {}}) client = TestClient(app) @@ -260,7 +260,7 @@ def test_plugin_init_raises_on_pre_31(): "af_ver, should_use_ctx", [ ("3.1.0", True), - ("3.1.1", True), + ("3.1.1", False), ("3.1.2", False), ("3.2.0", False), ], @@ -322,7 +322,7 @@ def fake_get(section, key, fallback=None): assert projects["core"]["name"] == "Core" -@_skip_pre_31 +@skip_pre_airflow_31 def test_index_raises_exception_returns_500(tmp_path: Path): docs_dir = tmp_path / "target" docs_dir.mkdir(parents=True) @@ -336,7 +336,7 @@ def test_index_raises_exception_returns_500(tmp_path: Path): assert "Cosmos dbt docs error" in r.text -@_skip_pre_31 +@skip_pre_airflow_31 def test_catalog_not_configured_returns_404(): af3, app = _app_with_projects({"core": {}}) client = TestClient(app) @@ -345,7 +345,7 @@ def test_catalog_not_configured_returns_404(): assert r.json()["error"] == "not configured" -@_skip_pre_31 +@skip_pre_airflow_31 def test_manifest_missing_includes_path_and_connid(tmp_path: Path): docs_dir = tmp_path / "target" docs_dir.mkdir(parents=True) @@ -370,7 +370,7 @@ def test_manifest_missing_includes_path_and_connid(tmp_path: Path): assert body["path"].endswith("/target/manifest.json") -@_skip_pre_31 +@skip_pre_airflow_31 def test_catalog_missing_includes_path_and_connid(tmp_path: Path): docs_dir = tmp_path / "target" docs_dir.mkdir(parents=True)