Skip to content
16 changes: 16 additions & 0 deletions cosmos/plugin/airflow2.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
from flask import abort
from flask_appbuilder import AppBuilder, expose

from cosmos import telemetry
from cosmos.listeners import dag_run_listener, task_instance_listener
from cosmos.plugin.snippets import IFRAME_SCRIPT
from cosmos.plugin.storage import get_storage_type_from_path
from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name, in_astro_cloud

if in_astro_cloud:
Expand Down Expand Up @@ -136,6 +138,20 @@ def create_blueprint(
@expose("/dbt_docs") # type: ignore[untyped-decorator]
@has_access(MENU_ACCESS_PERMISSIONS) # type: ignore[untyped-decorator]
def dbt_docs(self) -> str:
# Emit telemetry for dbt docs access
storage_type = "not_configured"
if dbt_docs_dir is not None:
storage_type = get_storage_type_from_path(dbt_docs_dir)

telemetry.emit_usage_metrics_if_enabled(
event_type="dbt_docs_access",
additional_metrics={
"storage_type": storage_type,
"docs_dir_configured": dbt_docs_dir is not None,
"uses_custom_conn": dbt_docs_conn_id is not None,
},
)

if dbt_docs_dir is None:
return self.render_template("dbt_docs_not_set_up.html") # type: ignore[no-any-return,no-untyped-call]
return self.render_template("dbt_docs.html") # type: ignore[no-any-return,no-untyped-call]
Expand Down
17 changes: 17 additions & 0 deletions cosmos/plugin/airflow3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
from fastapi.responses import HTMLResponse, JSONResponse, Response
from packaging.version import Version

from cosmos import telemetry
from cosmos.constants import AIRFLOW_OBJECT_STORAGE_PATH_URL_SCHEMES
from cosmos.listeners import dag_run_listener, task_instance_listener
from cosmos.plugin.snippets import IFRAME_SCRIPT
from cosmos.plugin.storage import get_storage_type_from_path

# Airflow version gating: External views feature for the plugins used here (CosmosAF3Plugin) exist only in >= 3.1
# Note: We compute AIRFLOW_VERSION locally here (not from constants) so that tests can patch airflow.__version__ and reload this module
Expand Down Expand Up @@ -150,8 +152,23 @@ def dbt_docs_view(slug_alias: str = slug) -> str: # type: ignore[no-redef]
response_class=HTMLResponse,
)
def dbt_docs_index(slug_alias: str = slug) -> Response: # type: ignore[no-redef]
# Emit telemetry for dbt docs access
cfg_local = projects.get(slug_alias, {})
docs_dir_local = cfg_local.get("dir")
storage_type = "not_configured"
if docs_dir_local is not None:
storage_type = get_storage_type_from_path(str(docs_dir_local))

telemetry.emit_usage_metrics_if_enabled(
event_type="dbt_docs_access",
additional_metrics={
"storage_type": storage_type,
"docs_dir_configured": docs_dir_local is not None,
"uses_custom_conn": cfg_local.get("conn_id") is not None,
"has_custom_name": cfg_local.get("name") is not None,
},
)

conn_id_local = cfg_local.get("conn_id")
index_local = cfg_local.get("index") or "index.html"
if not docs_dir_local:
Expand Down
23 changes: 23 additions & 0 deletions cosmos/plugin/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Utility functions for Cosmos plugins."""


def get_storage_type_from_path(path: str) -> str:
"""Determine the storage type from the path.

Args:
path: Storage path (e.g., 's3://bucket/path', '/local/path')

Returns:
Storage type identifier: 's3', 'gcs', 'azure', 'http', or 'local'
"""
path = path.strip()
if path.startswith("s3://"):
return "s3"
elif path.startswith("gs://"):
return "gcs"
elif path.startswith("wasb://"):
return "azure"
elif path.startswith("http://") or path.startswith("https://"):
return "http"
else:
return "local"
74 changes: 74 additions & 0 deletions tests/plugin/test_plugin_af2.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,77 @@ def test_has_access_with_permissions_in_astro_must_include_custom_menu(url_path,

def test_cosmos_plugin_enabled_on_airflow2():
assert cosmos.plugin.CosmosPlugin is not None


@pytest.mark.integration
@patch("cosmos.telemetry.emit_usage_metrics_if_enabled")
def test_dbt_docs_emits_telemetry(mock_emit, monkeypatch, app):
monkeypatch.setattr("cosmos.plugin.airflow2.dbt_docs_dir", "s3://my-bucket/docs")
monkeypatch.setattr("cosmos.plugin.airflow2.dbt_docs_conn_id", "my_s3_conn")

response = app.get("/cosmos/dbt_docs")

assert response.status_code == 200
mock_emit.assert_called_once_with(
event_type="dbt_docs_access",
additional_metrics={
"storage_type": "s3",
"docs_dir_configured": True,
"uses_custom_conn": True,
},
)


@pytest.mark.integration
@patch("cosmos.telemetry.emit_usage_metrics_if_enabled")
def test_dbt_docs_emits_telemetry_not_configured(mock_emit, monkeypatch, app):
monkeypatch.setattr("cosmos.plugin.airflow2.dbt_docs_dir", None)
monkeypatch.setattr("cosmos.plugin.airflow2.dbt_docs_conn_id", None)

response = app.get("/cosmos/dbt_docs")

assert response.status_code == 200
mock_emit.assert_called_once_with(
event_type="dbt_docs_access",
additional_metrics={
"storage_type": "not_configured",
"docs_dir_configured": False,
"uses_custom_conn": False,
},
)


@pytest.mark.integration
@patch("cosmos.telemetry.emit_usage_metrics_if_enabled")
def test_dbt_docs_emits_telemetry_local_storage(mock_emit, monkeypatch, app):
monkeypatch.setattr("cosmos.plugin.airflow2.dbt_docs_dir", "/local/path/to/docs")
monkeypatch.setattr("cosmos.plugin.airflow2.dbt_docs_conn_id", None)

response = app.get("/cosmos/dbt_docs")

assert response.status_code == 200
mock_emit.assert_called_once_with(
event_type="dbt_docs_access",
additional_metrics={
"storage_type": "local",
"docs_dir_configured": True,
"uses_custom_conn": False,
},
)


@pytest.mark.parametrize(
"path,expected_type",
[
("s3://bucket/path", "s3"),
("gs://bucket/path", "gcs"),
("wasb://container/path", "azure"),
("http://example.com/path", "http"),
("https://example.com/path", "http"),
("/local/path", "local"),
],
)
def test_get_storage_type(path, expected_type):
from cosmos.plugin.storage import get_storage_type_from_path

assert get_storage_type_from_path(path) == expected_type
102 changes: 102 additions & 0 deletions tests/plugin/test_plugin_af3.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,105 @@ def test_plugin_registers_listeners():
assert hasattr(plugin, "listeners"), "Plugin must define a `listeners` attribute"

assert dag_run_listener in plugin.listeners, "CosmosAF3Plugin.listeners must include dag_run_listener module"


@skip_pre_airflow_31
@patch("cosmos.telemetry.emit_usage_metrics_if_enabled")
def test_dbt_docs_emits_telemetry(mock_emit, tmp_path: Path):
"""Test that accessing dbt docs emits telemetry."""
docs_dir = tmp_path / "target"
docs_dir.mkdir(parents=True)
index_file = docs_dir / "index.html"
index_file.write_text("<head></head><body>dbt</body>")

projects = {
"my_project": {
"dir": f"s3://my-bucket/docs",
"index": "index.html",
"name": "My Project",
"conn_id": "my_s3_conn",
}
}
af3, app = _app_with_projects(projects)
client = TestClient(app)

with patch("cosmos.plugin.airflow3.open_file", return_value="<head></head><body>dbt</body>"):
r = client.get("/my_project/dbt_docs_index.html")

assert r.status_code == 200
mock_emit.assert_called_once_with(
event_type="dbt_docs_access",
additional_metrics={
"storage_type": "s3",
"docs_dir_configured": True,
"uses_custom_conn": True,
"has_custom_name": True,
},
)


@skip_pre_airflow_31
@patch("cosmos.telemetry.emit_usage_metrics_if_enabled")
def test_dbt_docs_emits_telemetry_not_configured(mock_emit):
"""Test that accessing dbt docs emits telemetry when not configured."""
projects = {"empty": {}}
af3, app = _app_with_projects(projects)
client = TestClient(app)

r = client.get("/empty/dbt_docs_index.html")

assert r.status_code == 404
mock_emit.assert_called_once_with(
event_type="dbt_docs_access",
additional_metrics={
"storage_type": "not_configured",
"docs_dir_configured": False,
"uses_custom_conn": False,
"has_custom_name": False,
},
)


@skip_pre_airflow_31
@patch("cosmos.telemetry.emit_usage_metrics_if_enabled")
def test_dbt_docs_emits_telemetry_local_storage(mock_emit, tmp_path: Path):
"""Test that accessing dbt docs emits telemetry for local storage."""
docs_dir = tmp_path / "target"
docs_dir.mkdir(parents=True)
index_file = docs_dir / "index.html"
index_file.write_text("<head></head><body>dbt</body>")

projects = {"local": {"dir": str(docs_dir), "index": "index.html"}}
af3, app = _app_with_projects(projects)
client = TestClient(app)

r = client.get("/local/dbt_docs_index.html")

assert r.status_code == 200
mock_emit.assert_called_once_with(
event_type="dbt_docs_access",
additional_metrics={
"storage_type": "local",
"docs_dir_configured": True,
"uses_custom_conn": False,
"has_custom_name": False,
},
)


@pytest.mark.parametrize(
"path,expected_type",
[
("s3://bucket/path", "s3"),
("gs://bucket/path", "gcs"),
("wasb://container/path", "azure"),
("http://example.com/path", "http"),
("https://example.com/path", "http"),
("/local/path", "local"),
],
)
def test_get_storage_type(path, expected_type):
"""Test storage type detection from path."""
from cosmos.plugin.storage import get_storage_type_from_path

assert get_storage_type_from_path(path) == expected_type
6 changes: 3 additions & 3 deletions tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_emit_usage_metrics_is_unsuccessful(mock_httpx_get, caplog):
)
assert not is_success
log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v3/dag_run?cosmos_version=1.8.0a4&airflow_version=2.10.1&python_version=3.11&platform_system=darwin&platform_machine=amd64&status=success&dag_hash=d151d1fa2f03270ea116cc7494f2c591&task_count=3&cosmos_task_count=3&execution_modes=local. Status code: 404. Message: Non existent URL"""
assert caplog.text.startswith("WARNING")
assert "WARNING" in caplog.text
assert log_msg in caplog.text


Expand All @@ -94,7 +94,7 @@ def test_emit_usage_metrics_fails(mock_httpx_get, caplog):
)
assert not is_success
log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v3/dag_run?cosmos_version=1.8.0a4&airflow_version=2.10.1&python_version=3.11&platform_system=darwin&platform_machine=amd64&status=success&dag_hash=d151d1fa2f03270ea116cc7494f2c591&task_count=3&cosmos_task_count=3&execution_modes=local. An HTTPX connection error occurred: Something is not right."""
assert caplog.text.startswith("WARNING")
assert "WARNING" in caplog.text
assert log_msg in caplog.text


Expand Down Expand Up @@ -125,7 +125,7 @@ def test_emit_usage_metrics_succeeds(caplog):
def test_emit_usage_metrics_if_enabled_fails(mock_should_emit, caplog):
caplog.set_level(logging.DEBUG)
assert not telemetry.emit_usage_metrics_if_enabled("any", {})
assert caplog.text.startswith("DEBUG")
assert "DEBUG" in caplog.text
assert "Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True." in caplog.text


Expand Down