diff --git a/cosmos/plugin/__init__.py b/cosmos/plugin/__init__.py index d05e15dd6f..a40d07c7f7 100644 --- a/cosmos/plugin/__init__.py +++ b/cosmos/plugin/__init__.py @@ -9,7 +9,7 @@ from flask import abort, url_for from flask_appbuilder import AppBuilder, expose -from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir +from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name def bucket_and_key(path: str) -> Tuple[str, str]: @@ -19,32 +19,43 @@ def bucket_and_key(path: str) -> Tuple[str, str]: return bucket, key -def open_s3_file(conn_id: Optional[str], path: str) -> str: +def open_s3_file(path: str, conn_id: Optional[str]) -> str: from airflow.providers.amazon.aws.hooks.s3 import S3Hook + from botocore.exceptions import ClientError if conn_id is None: conn_id = S3Hook.default_conn_name hook = S3Hook(aws_conn_id=conn_id) bucket, key = bucket_and_key(path) - content = hook.read_key(key=key, bucket_name=bucket) + try: + content = hook.read_key(key=key, bucket_name=bucket) + except ClientError as e: + if e.response.get("Error", {}).get("Code", "") == "NoSuchKey": + raise FileNotFoundError(f"{path} does not exist") + raise e return content # type: ignore[no-any-return] -def open_gcs_file(conn_id: Optional[str], path: str) -> str: +def open_gcs_file(path: str, conn_id: Optional[str]) -> str: from airflow.providers.google.cloud.hooks.gcs import GCSHook + from google.cloud.exceptions import NotFound if conn_id is None: conn_id = GCSHook.default_conn_name hook = GCSHook(gcp_conn_id=conn_id) bucket, blob = bucket_and_key(path) - content = hook.download(bucket_name=bucket, object_name=blob) + try: + content = hook.download(bucket_name=bucket, object_name=blob) + except NotFound: + raise FileNotFoundError(f"{path} does not exist") return content.decode("utf-8") # type: ignore[no-any-return] -def open_azure_file(conn_id: Optional[str], path: str) -> str: +def open_azure_file(path: str, conn_id: Optional[str]) -> str: from airflow.providers.microsoft.azure.hooks.wasb import WasbHook + from azure.core.exceptions import ResourceNotFoundError if conn_id is None: conn_id = WasbHook.default_conn_name @@ -52,32 +63,45 @@ def open_azure_file(conn_id: Optional[str], path: str) -> str: hook = WasbHook(wasb_conn_id=conn_id) container, blob = bucket_and_key(path) - content = hook.read_file(container_name=container, blob_name=blob) + try: + content = hook.read_file(container_name=container, blob_name=blob) + except ResourceNotFoundError: + raise FileNotFoundError(f"{path} does not exist") return content # type: ignore[no-any-return] -def open_http_file(conn_id: Optional[str], path: str) -> str: +def open_http_file(path: str, conn_id: Optional[str]) -> str: from airflow.providers.http.hooks.http import HttpHook + from requests.exceptions import HTTPError if conn_id is None: conn_id = "" hook = HttpHook(method="GET", http_conn_id=conn_id) - res = hook.run(endpoint=path) - hook.check_response(res) + try: + res = hook.run(endpoint=path) + hook.check_response(res) + except HTTPError as e: + if str(e).startswith("404"): + raise FileNotFoundError(f"{path} does not exist") + raise e return res.text # type: ignore[no-any-return] -def open_file(path: str) -> str: - """Retrieve a file from http, https, gs, s3, or wasb.""" +def open_file(path: str, conn_id: Optional[str] = None) -> str: + """ + Retrieve a file from http, https, gs, s3, or wasb. + + Raise a (base Python) FileNotFoundError if the file is not found. + """ if path.strip().startswith("s3://"): - return open_s3_file(conn_id=dbt_docs_conn_id, path=path) + return open_s3_file(path, conn_id=conn_id) elif path.strip().startswith("gs://"): - return open_gcs_file(conn_id=dbt_docs_conn_id, path=path) + return open_gcs_file(path, conn_id=conn_id) elif path.strip().startswith("wasb://"): - return open_azure_file(conn_id=dbt_docs_conn_id, path=path) + return open_azure_file(path, conn_id=conn_id) elif path.strip().startswith("http://") or path.strip().startswith("https://"): - return open_http_file(conn_id=dbt_docs_conn_id, path=path) + return open_http_file(path, conn_id=conn_id) else: with open(path) as f: content = f.read() @@ -167,27 +191,39 @@ def dbt_docs(self) -> str: def dbt_docs_index(self) -> str: if dbt_docs_dir is None: abort(404) - html = open_file(op.join(dbt_docs_dir, "index.html")) - # Hack the dbt docs to render properly in an iframe - iframe_resizer_url = url_for(".static", filename="iframeResizer.contentWindow.min.js") - html = html.replace("", f'{iframe_script}', 1) - return html + try: + html = open_file(op.join(dbt_docs_dir, dbt_docs_index_file_name), conn_id=dbt_docs_conn_id) + except FileNotFoundError: + abort(404) + else: + # Hack the dbt docs to render properly in an iframe + iframe_resizer_url = url_for(".static", filename="iframeResizer.contentWindow.min.js") + html = html.replace("", f'{iframe_script}', 1) + return html @expose("/catalog.json") # type: ignore[misc] @has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)]) def catalog(self) -> Tuple[str, int, Dict[str, Any]]: if dbt_docs_dir is None: abort(404) - data = open_file(op.join(dbt_docs_dir, "catalog.json")) - return data, 200, {"Content-Type": "application/json"} + try: + data = open_file(op.join(dbt_docs_dir, "catalog.json"), conn_id=dbt_docs_conn_id) + except FileNotFoundError: + abort(404) + else: + return data, 200, {"Content-Type": "application/json"} @expose("/manifest.json") # type: ignore[misc] @has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)]) def manifest(self) -> Tuple[str, int, Dict[str, Any]]: if dbt_docs_dir is None: abort(404) - data = open_file(op.join(dbt_docs_dir, "manifest.json")) - return data, 200, {"Content-Type": "application/json"} + try: + data = open_file(op.join(dbt_docs_dir, "manifest.json"), conn_id=dbt_docs_conn_id) + except FileNotFoundError: + abort(404) + else: + return data, 200, {"Content-Type": "application/json"} dbt_docs_view = DbtDocsView() diff --git a/cosmos/settings.py b/cosmos/settings.py index 369913b932..fc59541315 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -14,6 +14,7 @@ propagate_logs = conf.getboolean("cosmos", "propagate_logs", fallback=True) dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None) dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None) +dbt_docs_index_file_name = conf.get("cosmos", "dbt_docs_index_file_name", fallback="index.html") try: LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") diff --git a/docs/configuration/hosting-docs.rst b/docs/configuration/hosting-docs.rst index 2ab4fdf69e..755bfe815d 100644 --- a/docs/configuration/hosting-docs.rst +++ b/docs/configuration/hosting-docs.rst @@ -34,6 +34,14 @@ or as an environment variable: The path can be either a folder in the local file system the webserver is running on, or a URI to a cloud storage platform (S3, GCS, Azure). +If your docs were generated using the ``--static`` flag, you can set the index filename using ``dbt_docs_index_file_name``: + +.. code-block:: cfg + + [cosmos] + dbt_docs_index_file_name = static_index.html + + Host from Cloud Storage ~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/pyproject.toml b/pyproject.toml index ea97a9c0c4..c91b6abb34 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "importlib-metadata; python_version < '3.8'", "Jinja2>=3.0.0", "msgpack", + "packaging", "pydantic>=1.10.0", "typing-extensions; python_version < '3.8'", "virtualenv", @@ -127,6 +128,7 @@ dependencies = [ "apache-airflow-providers-cncf-kubernetes>=5.1.1", "apache-airflow-providers-amazon>=3.0.0,<8.20.0", # https://github.com/apache/airflow/issues/39103 "apache-airflow-providers-docker>=3.5.0", + "apache-airflow-providers-google", "apache-airflow-providers-microsoft-azure", "apache-airflow-providers-postgres", "types-PyYAML", diff --git a/tests/plugin/test_plugin.py b/tests/plugin/test_plugin.py index 796bfff8d0..c15d183bd0 100644 --- a/tests/plugin/test_plugin.py +++ b/tests/plugin/test_plugin.py @@ -14,10 +14,10 @@ jinja2.escape = markupsafe.escape import sys +from importlib.util import find_spec from unittest.mock import MagicMock, PropertyMock, mock_open, patch import pytest -from airflow.configuration import conf from airflow.utils.db import initdb, resetdb from airflow.www.app import cached_app from airflow.www.extensions.init_appbuilder import AirflowAppBuilder @@ -34,8 +34,6 @@ open_s3_file, ) -original_conf_get = conf.get - def _get_text_from_response(response) -> str: # Airflow < 2.4 uses an old version of Werkzeug that does not have Response.text. @@ -64,13 +62,6 @@ def app() -> FlaskClient: def test_dbt_docs(monkeypatch, app): - def conf_get(section, key, *args, **kwargs): - if section == "cosmos" and key == "dbt_docs_dir": - return "path/to/docs/dir" - else: - return original_conf_get(section, key, *args, **kwargs) - - monkeypatch.setattr(conf, "get", conf_get) monkeypatch.setattr("cosmos.plugin.dbt_docs_dir", "path/to/docs/dir") response = app.get("/cosmos/dbt_docs") @@ -90,28 +81,37 @@ def test_dbt_docs_not_set_up(monkeypatch, app): @patch.object(cosmos.plugin, "open_file") @pytest.mark.parametrize("artifact", ["dbt_docs_index.html", "manifest.json", "catalog.json"]) def test_dbt_docs_artifact(mock_open_file, monkeypatch, app, artifact): - def conf_get(section, key, *args, **kwargs): - if section == "cosmos" and key == "dbt_docs_dir": - return "path/to/docs/dir" - else: - return original_conf_get(section, key, *args, **kwargs) - - monkeypatch.setattr(conf, "get", conf_get) monkeypatch.setattr("cosmos.plugin.dbt_docs_dir", "path/to/docs/dir") + monkeypatch.setattr("cosmos.plugin.dbt_docs_conn_id", "mock_conn_id") + monkeypatch.setattr("cosmos.plugin.dbt_docs_index_file_name", "custom_index.html") if artifact == "dbt_docs_index.html": mock_open_file.return_value = "" + storage_path = "path/to/docs/dir/custom_index.html" else: mock_open_file.return_value = "{}" + storage_path = f"path/to/docs/dir/{artifact}" response = app.get(f"/cosmos/{artifact}") - mock_open_file.assert_called_once() + mock_open_file.assert_called_once_with(storage_path, conn_id="mock_conn_id") assert response.status_code == 200 if artifact == "dbt_docs_index.html": assert iframe_script in _get_text_from_response(response) +@patch.object(cosmos.plugin, "open_file") +@pytest.mark.parametrize("artifact", ["dbt_docs_index.html", "manifest.json", "catalog.json"]) +def test_dbt_docs_artifact_not_found(mock_open_file, monkeypatch, app, artifact): + monkeypatch.setattr("cosmos.plugin.dbt_docs_dir", "path/to/docs/dir") + mock_open_file.side_effect = FileNotFoundError + + response = app.get(f"/cosmos/{artifact}") + + mock_open_file.assert_called_once() + assert response.status_code == 404 + + @pytest.mark.parametrize("artifact", ["dbt_docs_index.html", "manifest.json", "catalog.json"]) def test_dbt_docs_artifact_missing(app, artifact): response = app.get(f"/cosmos/{artifact}") @@ -128,21 +128,12 @@ def test_dbt_docs_artifact_missing(app, artifact): ("https://my-bucket/my/path/", "open_http_file"), ], ) -def test_open_file_calls(path, open_file_callback, monkeypatch): - def conf_get(section, key, *args, **kwargs): - if section == "cosmos" and key == "dbt_docs_conn_id": - return "mock_conn_id" - else: - return original_conf_get(section, key, *args, **kwargs) - - monkeypatch.setattr(conf, "get", conf_get) - monkeypatch.setattr("cosmos.plugin.dbt_docs_conn_id", "mock_conn_id") - +def test_open_file_calls(path, open_file_callback): with patch.object(cosmos.plugin, open_file_callback) as mock_callback: mock_callback.return_value = "mock file contents" - res = open_file(path) + res = open_file(path, conn_id="mock_conn_id") - mock_callback.assert_called_with(conn_id="mock_conn_id", path=path) + mock_callback.assert_called_with(path, conn_id="mock_conn_id") assert res == "mock file contents" @@ -153,7 +144,7 @@ def test_open_s3_file(conn_id): mock_hook = mock_module.S3Hook.return_value mock_hook.read_key.return_value = "mock file contents" - res = open_s3_file(conn_id=conn_id, path="s3://mock-path/to/docs") + res = open_s3_file("s3://mock-path/to/docs", conn_id=conn_id) if conn_id is not None: mock_module.S3Hook.assert_called_once_with(aws_conn_id=conn_id) @@ -162,6 +153,28 @@ def test_open_s3_file(conn_id): assert res == "mock file contents" +@pytest.mark.skipif( + find_spec("airflow.providers.google") is None, + reason="apache-airflow-providers-amazon not installed, which is required for this test.", +) +def test_open_s3_file_not_found(): + from botocore.exceptions import ClientError + + mock_module = MagicMock() + with patch.dict(sys.modules, {"airflow.providers.amazon.aws.hooks.s3": mock_module}): + mock_hook = mock_module.S3Hook.return_value + + def side_effect(*args, **kwargs): + raise ClientError({"Error": {"Code": "NoSuchKey"}}, "") + + mock_hook.read_key.side_effect = side_effect + + with pytest.raises(FileNotFoundError): + open_s3_file("s3://mock-path/to/docs", conn_id="mock-conn-id") + + mock_module.S3Hook.assert_called_once() + + @pytest.mark.parametrize("conn_id", ["mock_conn_id", None]) def test_open_gcs_file(conn_id): mock_module = MagicMock() @@ -169,7 +182,7 @@ def test_open_gcs_file(conn_id): mock_hook = mock_module.GCSHook.return_value = MagicMock() mock_hook.download.return_value = b"mock file contents" - res = open_gcs_file(conn_id=conn_id, path="gs://mock-path/to/docs") + res = open_gcs_file("gs://mock-path/to/docs", conn_id=conn_id) if conn_id is not None: mock_module.GCSHook.assert_called_once_with(gcp_conn_id=conn_id) @@ -178,6 +191,28 @@ def test_open_gcs_file(conn_id): assert res == "mock file contents" +@pytest.mark.skipif( + find_spec("airflow.providers.google") is None, + reason="apache-airflow-providers-google not installed, which is required for this test.", +) +def test_open_gcs_file_not_found(): + from google.cloud.exceptions import NotFound + + mock_module = MagicMock() + with patch.dict(sys.modules, {"airflow.providers.google.cloud.hooks.gcs": mock_module}): + mock_hook = mock_module.GCSHook.return_value = MagicMock() + + def side_effect(*args, **kwargs): + raise NotFound("") + + mock_hook.download.side_effect = side_effect + + with pytest.raises(FileNotFoundError): + open_gcs_file("gs://mock-path/to/docs", conn_id="mock-conn-id") + + mock_module.GCSHook.assert_called_once() + + @pytest.mark.parametrize("conn_id", ["mock_conn_id", None]) def test_open_azure_file(conn_id): mock_module = MagicMock() @@ -186,7 +221,7 @@ def test_open_azure_file(conn_id): mock_hook.default_conn_name = PropertyMock(return_value="default_conn") mock_hook.read_file.return_value = "mock file contents" - res = open_azure_file(conn_id=conn_id, path="wasb://mock-path/to/docs") + res = open_azure_file("wasb://mock-path/to/docs", conn_id=conn_id) if conn_id is not None: mock_module.WasbHook.assert_called_once_with(wasb_conn_id=conn_id) @@ -195,6 +230,25 @@ def test_open_azure_file(conn_id): assert res == "mock file contents" +@pytest.mark.skipif( + find_spec("airflow.providers.microsoft") is None, + reason="apache-airflow-providers-microsoft not installed, which is required for this test.", +) +def test_open_azure_file_not_found(): + from azure.core.exceptions import ResourceNotFoundError + + mock_module = MagicMock() + with patch.dict(sys.modules, {"airflow.providers.microsoft.azure.hooks.wasb": mock_module}): + mock_hook = mock_module.WasbHook.return_value = MagicMock() + + mock_hook.read_file.side_effect = ResourceNotFoundError + + with pytest.raises(FileNotFoundError): + open_azure_file("wasb://mock-path/to/docs", conn_id="mock-conn-id") + + mock_module.WasbHook.assert_called_once() + + @pytest.mark.parametrize("conn_id", ["mock_conn_id", None]) def test_open_http_file(conn_id): mock_module = MagicMock() @@ -205,7 +259,7 @@ def test_open_http_file(conn_id): mock_hook.check_response.return_value = mock_response mock_response.text = "mock file contents" - res = open_http_file(conn_id=conn_id, path="http://mock-path/to/docs") + res = open_http_file("http://mock-path/to/docs", conn_id=conn_id) if conn_id is not None: mock_module.HttpHook.assert_called_once_with(method="GET", http_conn_id=conn_id) @@ -216,6 +270,27 @@ def test_open_http_file(conn_id): assert res == "mock file contents" +def test_open_http_file_not_found(): + from requests.exceptions import HTTPError + + mock_module = MagicMock() + with patch.dict(sys.modules, {"airflow.providers.http.hooks.http": mock_module}): + mock_hook = mock_module.HttpHook.return_value = MagicMock() + + def side_effect(*args, **kwargs): + raise HTTPError("404 Client Error: Not Found for url: https://google.com/this/is/a/fake/path") + + mock_hook.run.side_effect = side_effect + + with pytest.raises(FileNotFoundError): + open_http_file("https://google.com/this/is/a/fake/path", conn_id="mock-conn-id") + + mock_module.HttpHook.assert_called_once() + + +"404 Client Error: Not Found for url: https://google.com/ashjdfasdkfahdjsf" + + @patch("builtins.open", new_callable=mock_open, read_data="mock file contents") def test_open_file_local(mock_file): res = open_file("/my/path")