Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions cosmos/plugin/airflow3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,34 @@
from unittest.mock import patch
from urllib.parse import urlsplit

import airflow
from airflow.configuration import conf
from airflow.plugins_manager import AirflowPlugin
from airflow.sdk import ObjectStoragePath
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, JSONResponse, Response
from packaging.version import Version

from cosmos.constants import AIRFLOW_OBJECT_STORAGE_PATH_URL_SCHEMES
from cosmos.plugin.snippets import IFRAME_SCRIPT

# Airflow version gating: External views feature for the plugins used here (CosmosAF3Plugin) exist only in >= 3.1
AIRFLOW_VERSION = Version(airflow.__version__)


def ensure_airflow_version_supported() -> None:
if AIRFLOW_VERSION < Version("3.1.0"):
raise RuntimeError(
"Cosmos AF3 plugin requires Airflow >= 3.1. External views are unavailable on earlier versions."
)
Comment thread
pankajkoti marked this conversation as resolved.


API_BASE = conf.get("api", "base_url", fallback="") # reads AIRFLOW__API__BASE_URL
API_BASE_PATH = urlsplit(API_BASE).path.rstrip("/")


# TODO: This context manager and its usage in the method "_read_content_via_object_storage" has been added due
# to the current limitation in Airflow 3.1.0 where plugins are not able to resolve connections via the API server.
# Once this is fixed, potentially in PR https://github.com/apache/airflow/pull/56602 planned to be released in
# Airflow 3.1.1, test the fix and remove this context manager and its usage.
# Note: Airflow 3.1.0 had a limitation where plugins could not resolve connections via the API server.
# The fix was shipped in Airflow 3.1.1. For 3.1.0, we temporarily expose the connection via env vars inside a context manager.
@contextmanager
def connection_env(conn_id: str | None = None) -> Generator[None, None, None]: # pragma: no cover
"""
Expand All @@ -48,7 +59,14 @@ def connection_env(conn_id: str | None = None) -> Generator[None, None, None]:


def _read_content_via_object_storage(path: str, conn_id: str | None = None) -> Any:
with connection_env(conn_id):
# Use connection_env only for Airflow 3.1.0
if AIRFLOW_VERSION == Version("3.1.0"):
with connection_env(conn_id):
p = ObjectStoragePath(path, conn_id=conn_id) if conn_id else ObjectStoragePath(path)
with p.open("r") as f: # type: ignore[no-untyped-call]
content = f.read() # type: ignore[no-any-return]
return content
else:
p = ObjectStoragePath(path, conn_id=conn_id) if conn_id else ObjectStoragePath(path)
with p.open("r") as f: # type: ignore[no-untyped-call]
content = f.read() # type: ignore[no-any-return]
Expand Down Expand Up @@ -102,6 +120,7 @@ def _load_projects_from_conf() -> dict[str, dict[str, Optional[str]]]:


def create_cosmos_fastapi_app() -> FastAPI: # noqa: C901
ensure_airflow_version_supported()
app = FastAPI()

projects = _load_projects_from_conf()
Expand Down Expand Up @@ -237,20 +256,23 @@ def catalog(slug_alias: str = slug) -> Response: # type: ignore[no-redef]
class CosmosAF3Plugin(AirflowPlugin):
name = "cosmos"

# Mount our FastAPI sub-app under /cosmos
fastapi_apps = [
{
"name": "cosmos",
"app": create_cosmos_fastapi_app(),
"url_prefix": "/cosmos",
}
]
# Mount our FastAPI sub-app under /cosmos (initialized in __init__ after version check)
fastapi_apps: list[dict[str, Any]] = []

# Register external views for navigation
external_views: list[dict[str, Any]] = []

def __init__(self) -> None:
super().__init__()
ensure_airflow_version_supported()
# Initialize FastAPI app only after version support is confirmed
self.fastapi_apps = [
{
"name": "cosmos",
"app": create_cosmos_fastapi_app(),
"url_prefix": "/cosmos",
}
]
projects = _load_projects_from_conf()
for slug, cfg in projects.items():
display_name = cfg.get("name") or f"dbt Docs ({slug})"
Expand Down
4 changes: 4 additions & 0 deletions docs/airflow3_compatibility/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ Multiple dbt docs in Airflow 3 UI
---------------------------------

There have been significant changes to how plugins work in Airflow 3.x. Cosmos now supports Airflow 3 FastAPI plugins for UI integration and hosting dbt docs via external views.

.. important::
The Cosmos Airflow 3 plugin (FastAPI external views) is supported on Airflow >= 3.1 only.

Cosmos registers a FastAPI sub-application at ``/cosmos`` and adds menu entries under **Browse**. Configure one or more projects in ``airflow.cfg`` under the ``[cosmos]`` section:

.. code-block:: ini
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/cosmos-conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ This page lists all available Airflow configurations that affect ``astronomer-co
.. _dbt_docs_projects:

`dbt_docs_projects`_:
(Introduced in Cosmos 1.11.0 and applicable to Airflow 3): JSON mapping configuring one or more dbt docs projects for the Airflow 3 UI plugin.
(Introduced in Cosmos 1.11.0; applicable to Airflow >= 3.1): JSON mapping configuring one or more dbt docs projects for the Airflow 3 UI plugin.

Structure: mapping of slug to a dict with keys ``dir`` (required), ``index`` (optional, default ``index.html``),
``name`` (optional, label in the menu), and ``conn_id`` (optional connection to read remote storage).
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/hosting-docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ The dbt docs are available in the Airflow menu under ``Browse``:
:align: center

For Airflow 3, Cosmos exposes the docs under a FastAPI sub-app and external view entries under **Browse**.

.. important::
The Airflow 3 plugin support in Cosmos requires Airflow >= 3.1.

You can configure one or more projects via ``[cosmos].dbt_docs_projects``:

.. code-block:: ini
Expand Down
90 changes: 90 additions & 0 deletions tests/plugin/test_plugin_af3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import pytest
from fastapi.testclient import TestClient

skip_pre_airflow_31 = pytest.mark.skipif(
version.parse(airflow_version) < version.parse("3.1.0"),
reason="AF3 plugin only supported on Airflow >= 3.1",
)


@pytest.fixture(autouse=True)
def _isolate_env():
Expand All @@ -39,13 +44,22 @@ def _reload_af3_module(api_base: str | None = None):
return af3


def _reload_af3_with_version(af_ver: str):
import cosmos.plugin.airflow3 as af3

with patch.object(af3.airflow, "__version__", af_ver):
importlib.reload(af3)
return af3


def _app_with_projects(projects: dict[str, dict[str, str]]):
af3 = _reload_af3_module()
with patch("cosmos.plugin.airflow3._load_projects_from_conf", return_value=projects):
app = af3.create_cosmos_fastapi_app()
return af3, app


@skip_pre_airflow_31
def test_dbt_docs_view_and_index_local(tmp_path: Path):
# Arrange: create local docs files
docs_dir = tmp_path / "target"
Expand Down Expand Up @@ -83,6 +97,7 @@ def test_dbt_docs_view_and_index_local(tmp_path: Path):
assert r.json() == {"sources": {}}


@skip_pre_airflow_31
def test_index_missing_file_returns_404(tmp_path: Path):
docs_dir = tmp_path / "target"
docs_dir.mkdir(parents=True)
Expand All @@ -96,6 +111,7 @@ def test_index_missing_file_returns_404(tmp_path: Path):
assert "index not found" in r.text


@skip_pre_airflow_31
def test_manifest_and_catalog_error_500(tmp_path: Path):
docs_dir = tmp_path / "target"
docs_dir.mkdir(parents=True)
Expand All @@ -114,6 +130,7 @@ def test_manifest_and_catalog_error_500(tmp_path: Path):
assert "catalog read failed" in r.text


@skip_pre_airflow_31
def test_external_view_href_uses_api_base_path():
# Ensure base path is respected (e.g., Astronomer deployment prefix)
_reload_af3_module(api_base="https://host/prefix/")
Expand All @@ -130,6 +147,7 @@ def test_external_view_href_uses_api_base_path():
assert plugin.external_views[0]["href"].endswith("/cosmos/core/dbt_docs_index.html")


@skip_pre_airflow_31
def test_external_view_href_no_base_path():
_reload_af3_module(api_base="")

Expand All @@ -143,6 +161,7 @@ def test_external_view_href_no_base_path():
assert plugin.external_views[0]["href"].startswith("/cosmos/")


@skip_pre_airflow_31
def test_not_configured_routes():
af3, app = _app_with_projects({"core": {}})
client = TestClient(app)
Expand Down Expand Up @@ -221,6 +240,73 @@ def open(self, mode="r", **kwargs):
assert af3.open_file("gs://bucket/obj", conn_id=None) == "GCS_CONTENT"


def test_version_gate_raises_on_pre_31():
af3 = _reload_af3_with_version("3.0.9")
with pytest.raises(RuntimeError, match="requires Airflow >= 3.1"):
af3.ensure_airflow_version_supported()
# Also validate app creation enforces gate
with pytest.raises(RuntimeError):
af3.create_cosmos_fastapi_app()


def test_plugin_init_raises_on_pre_31():
af3 = _reload_af3_with_version("3.0.9")
with patch("cosmos.plugin.airflow3._load_projects_from_conf", return_value={}):
with pytest.raises(RuntimeError):
af3.CosmosAF3Plugin()


@pytest.mark.parametrize(
"af_ver, should_use_ctx",
[
("3.1.0", True),
("3.1.1", False),
("3.1.2", False),
("3.2.0", False),
],
)
def test_connection_env_usage_depends_on_version(af_ver: str, should_use_ctx: bool):
af3 = _reload_af3_with_version(af_ver)

import contextlib

class _FakeFile:
def __init__(self, data: str):
self._data = data

def __enter__(self):
return self

def __exit__(self, *args):
return False

def read(self):
return self._data

class _FakePath:
def __init__(self, path: str, conn_id=None):
self.path = path
self.conn_id = conn_id

def open(self, mode="r", **kwargs):
return _FakeFile("OK")

ctx_calls = {"count": 0}

@contextlib.contextmanager
def _ctx(_conn_id=None): # type: ignore[no-redef]
ctx_calls["count"] += 1
yield

with patch("cosmos.plugin.airflow3.ObjectStoragePath", _FakePath):
with patch("cosmos.plugin.airflow3.connection_env", _ctx):
# Call the internal helper directly to isolate behavior
out = af3._read_content_via_object_storage("gs://x", conn_id="c1")
assert out == "OK"

assert (ctx_calls["count"] > 0) == should_use_ctx


def test_load_projects_from_conf_valid_json():
af3 = _reload_af3_module()

Expand All @@ -236,6 +322,7 @@ def fake_get(section, key, fallback=None):
assert projects["core"]["name"] == "Core"


@skip_pre_airflow_31
def test_index_raises_exception_returns_500(tmp_path: Path):
docs_dir = tmp_path / "target"
docs_dir.mkdir(parents=True)
Expand All @@ -249,6 +336,7 @@ def test_index_raises_exception_returns_500(tmp_path: Path):
assert "Cosmos dbt docs error" in r.text


@skip_pre_airflow_31
def test_catalog_not_configured_returns_404():
af3, app = _app_with_projects({"core": {}})
client = TestClient(app)
Expand All @@ -257,6 +345,7 @@ def test_catalog_not_configured_returns_404():
assert r.json()["error"] == "not configured"


@skip_pre_airflow_31
def test_manifest_missing_includes_path_and_connid(tmp_path: Path):
docs_dir = tmp_path / "target"
docs_dir.mkdir(parents=True)
Expand All @@ -281,6 +370,7 @@ def test_manifest_missing_includes_path_and_connid(tmp_path: Path):
assert body["path"].endswith("/target/manifest.json")


@skip_pre_airflow_31
def test_catalog_missing_includes_path_and_connid(tmp_path: Path):
docs_dir = tmp_path / "target"
docs_dir.mkdir(parents=True)
Expand Down