diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 96f0d5564b..1a1ad366d0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -143,6 +143,9 @@ jobs: AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0 AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres + AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }} + AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }} + AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }} DATABRICKS_HOST: mock DATABRICKS_WAREHOUSE_ID: mock DATABRICKS_TOKEN: mock @@ -213,6 +216,9 @@ jobs: env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres + AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }} + AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }} + AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }} PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }} DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }} @@ -275,6 +281,9 @@ jobs: env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres + AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }} + AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }} + AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }} AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }} @@ -348,6 +357,9 @@ jobs: env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres + AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }} + AIRFLOW_CONN_GCP_GS_CONN: ${{ secrets.AIRFLOW_CONN_GCP_GS_CONN }} + AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }} AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }} diff --git a/cosmos/config.py b/cosmos/config.py index e1e5d56f9a..fd9d09e184 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -10,9 +10,12 @@ from pathlib import Path from typing import Any, Callable, Iterator +from airflow.version import version as airflow_version + from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled from cosmos.constants import ( DEFAULT_PROFILES_FILE_NAME, + FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP, DbtResourceType, ExecutionMode, InvocationMode, @@ -24,6 +27,7 @@ from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger from cosmos.profiles import BaseProfileMapping +from cosmos.settings import AIRFLOW_IO_AVAILABLE logger = get_logger(__name__) @@ -150,6 +154,7 @@ def __init__( seeds_relative_path: str | Path = "seeds", snapshots_relative_path: str | Path = "snapshots", manifest_path: str | Path | None = None, + manifest_conn_id: str | None = None, project_name: str | None = None, env_vars: dict[str, str] | None = None, dbt_vars: dict[str, str] | None = None, @@ -175,7 +180,25 @@ def __init__( self.project_name = self.dbt_project_path.stem if manifest_path: - self.manifest_path = Path(manifest_path) + manifest_path_str = str(manifest_path) + if not manifest_conn_id: + manifest_scheme = manifest_path_str.split("://")[0] + # Use the default Airflow connection ID for the scheme if it is not provided. + manifest_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(manifest_scheme, None) + + if manifest_conn_id is not None and not AIRFLOW_IO_AVAILABLE: + raise CosmosValueError( + f"The manifest path {manifest_path_str} uses a remote file scheme, but the required Object " + f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " + f"Airflow 2.8 or later." + ) + + if AIRFLOW_IO_AVAILABLE: + from airflow.io.path import ObjectStoragePath + + self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id) + else: + self.manifest_path = Path(manifest_path_str) self.env_vars = env_vars self.dbt_vars = dbt_vars @@ -192,28 +215,30 @@ def validate_project(self) -> None: """ mandatory_paths = {} - + # We validate the existence of paths added to the `mandatory_paths` map by calling the `exists()` method on each + # one. Starting with Cosmos 1.6.0, if the Airflow version is `>= 2.8.0` and a `manifest_path` is provided, we + # cast it to an `airflow.io.path.ObjectStoragePath` instance during `ProjectConfig` initialisation, and it + # includes the `exists()` method. For the remaining paths in the `mandatory_paths` map, we cast them to + # `pathlib.Path` objects to ensure that the subsequent `exists()` call while iterating on the `mandatory_paths` + # map works correctly for all paths, thereby validating the project. if self.dbt_project_path: project_yml_path = self.dbt_project_path / "dbt_project.yml" mandatory_paths = { - "dbt_project.yml": project_yml_path, - "models directory ": self.models_path, + "dbt_project.yml": Path(project_yml_path) if project_yml_path else None, + "models directory ": Path(self.models_path) if self.models_path else None, } if self.manifest_path: mandatory_paths["manifest"] = self.manifest_path for name, path in mandatory_paths.items(): - if path is None or not Path(path).exists(): + if path is None or not path.exists(): raise CosmosValueError(f"Could not find {name} at {path}") def is_manifest_available(self) -> bool: """ Check if the `dbt` project manifest is set and if the file exists. """ - if not self.manifest_path: - return False - - return self.manifest_path.exists() + return self.manifest_path.exists() if self.manifest_path else False @dataclass diff --git a/cosmos/constants.py b/cosmos/constants.py index 956660e016..d512faf16e 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -3,6 +3,9 @@ from pathlib import Path import aenum +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.google.cloud.hooks.gcs import GCSHook +from airflow.providers.microsoft.azure.hooks.wasb import WasbHook from packaging.version import Version DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml") @@ -28,6 +31,17 @@ PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS = [Version("2.9.0"), Version("2.9.1")] +S3_FILE_SCHEME = "s3" +GS_FILE_SCHEME = "gs" +ABFS_FILE_SCHEME = "abfs" + +FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP = { + S3_FILE_SCHEME: S3Hook.default_conn_name, + GS_FILE_SCHEME: GCSHook.default_conn_name, + ABFS_FILE_SCHEME: WasbHook.default_conn_name, +} + + class LoadMode(Enum): """ Supported ways to load a `dbt` project into a `DbtGraph` instance. diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index b2f53cb8a1..68105eb211 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -13,7 +13,7 @@ from functools import cached_property from pathlib import Path from subprocess import PIPE, Popen -from typing import Any +from typing import TYPE_CHECKING, Any from airflow.models import Variable @@ -611,7 +611,11 @@ def load_from_dbt_manifest(self) -> None: raise CosmosLoadDbtException("Unable to load manifest without ExecutionConfig.dbt_project_path") nodes = {} - with open(self.project.manifest_path) as fp: # type: ignore[arg-type] + + if TYPE_CHECKING: + assert self.project.manifest_path is not None # pragma: no cover + + with self.project.manifest_path.open() as fp: manifest = json.load(fp) resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})} diff --git a/cosmos/settings.py b/cosmos/settings.py index 62d4ee5bdf..67d5928d90 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -4,6 +4,8 @@ import airflow from airflow.configuration import conf +from airflow.version import version as airflow_version +from packaging.version import Version from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE @@ -24,3 +26,5 @@ LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") except airflow.exceptions.AirflowConfigException: LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) + +AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0") diff --git a/dev/dags/cosmos_manifest_example.py b/dev/dags/cosmos_manifest_example.py index 225b38ddc1..a96eff9718 100644 --- a/dev/dags/cosmos_manifest_example.py +++ b/dev/dags/cosmos_manifest_example.py @@ -6,12 +6,17 @@ from datetime import datetime from pathlib import Path -from cosmos import DbtDag, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig +from airflow.decorators import dag +from airflow.operators.empty import EmptyOperator + +from cosmos import DbtTaskGroup, ExecutionConfig, LoadMode, ProfileConfig, ProjectConfig, RenderConfig from cosmos.profiles import DbtProfileConfigVars, PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) +execution_config = ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop") + profile_config = ProfileConfig( profile_name="default", target_name="dev", @@ -22,22 +27,84 @@ ), ) -# [START local_example] -cosmos_manifest_example = DbtDag( - # dbt/cosmos-specific parameters - project_config=ProjectConfig( - manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json", - project_name="jaffle_shop", - ), - profile_config=profile_config, - render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:seeds/raw_customers.csv"]), - execution_config=ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop"), - operator_args={"install_deps": True}, - # normal dag parameters +render_config = RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:seeds/raw_customers.csv"]) + + +@dag( schedule_interval="@daily", start_date=datetime(2023, 1, 1), catchup=False, - dag_id="cosmos_manifest_example", default_args={"retries": 2}, ) -# [END local_example] +def cosmos_manifest_example() -> None: + + pre_dbt = EmptyOperator(task_id="pre_dbt") + + # [START local_example] + local_example = DbtTaskGroup( + group_id="local_example", + project_config=ProjectConfig( + manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json", + project_name="jaffle_shop", + ), + profile_config=profile_config, + render_config=render_config, + execution_config=execution_config, + operator_args={"install_deps": True}, + ) + # [END local_example] + + # [START aws_s3_example] + aws_s3_example = DbtTaskGroup( + group_id="aws_s3_example", + project_config=ProjectConfig( + manifest_path="s3://cosmos-manifest-test/manifest.json", + manifest_conn_id="aws_s3_conn", + # `manifest_conn_id` is optional. If not provided, the default connection ID `aws_default` is used. + project_name="jaffle_shop", + ), + profile_config=profile_config, + render_config=render_config, + execution_config=execution_config, + operator_args={"install_deps": True}, + ) + # [END aws_s3_example] + + # [START gcp_gs_example] + gcp_gs_example = DbtTaskGroup( + group_id="gcp_gs_example", + project_config=ProjectConfig( + manifest_path="gs://cosmos-manifest-test/manifest.json", + manifest_conn_id="gcp_gs_conn", + # `manifest_conn_id` is optional. If not provided, the default connection ID `google_cloud_default` is used. + project_name="jaffle_shop", + ), + profile_config=profile_config, + render_config=render_config, + execution_config=execution_config, + operator_args={"install_deps": True}, + ) + # [END gcp_gs_example] + + # [START azure_abfs_example] + azure_abfs_example = DbtTaskGroup( + group_id="azure_abfs_example", + project_config=ProjectConfig( + manifest_path="abfs://cosmos-manifest-test/manifest.json", + manifest_conn_id="azure_abfs_conn", + # `manifest_conn_id` is optional. If not provided, the default connection ID `wasb_default` is used. + project_name="jaffle_shop", + ), + profile_config=profile_config, + render_config=render_config, + execution_config=execution_config, + operator_args={"install_deps": True}, + ) + # [END azure_abfs_example] + + post_dbt = EmptyOperator(task_id="post_dbt") + + (pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt) + + +cosmos_manifest_example() diff --git a/docs/configuration/parsing-methods.rst b/docs/configuration/parsing-methods.rst index ebd6030e6b..40f4236527 100644 --- a/docs/configuration/parsing-methods.rst +++ b/docs/configuration/parsing-methods.rst @@ -34,6 +34,7 @@ When you don't supply an argument to the ``load_mode`` parameter (or you supply To use this method, you don't need to supply any additional config. This is the default. + ``dbt_manifest`` ---------------- @@ -41,19 +42,55 @@ If you already have a ``manifest.json`` file created by dbt, Cosmos will parse t You can supply a ``manifest_path`` parameter on the DbtDag / DbtTaskGroup with a path to a ``manifest.json`` file. -To use this: +Before Cosmos 1.6.0, the path to ``manifest.json`` supplied via the ``DbtDag`` / ``DbtTaskGroup`` ``manifest_path`` +argument accepted only local paths. However, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, you can supply a +a remote path (e.g., an S3 URL) too. For supporting remote paths, Cosmos leverages the +`Airflow Object Storage `_ +feature released in Airflow 2.8.0. +For remote paths, you can specify a ``manifest_conn_id``, which is an +Airflow connection ID containing the credentials to access the remote path. If you do not specify a +``manifest_conn_id``, Cosmos will use the default connection ID specific to the scheme, identified using the Airflow +hook's ``default_conn_id`` corresponding to the URL's scheme. -.. code-block:: python +Examples of how to supply ``manifest.json`` using ``manifest_path`` argument: + +- Local path: + +.. literalinclude:: ../../dev/dags/cosmos_manifest_example.py + :language: python + :start-after: [START local_example] + :end-before: [END local_example] + +- AWS S3 URL (available since Cosmos 1.6): + +Ensure that you have the required dependencies installed to use the S3 URL. You can install the required dependencies +using the following command: ``pip install "astronomer-cosmos[amazon]"`` + +.. literalinclude:: ../../dev/dags/cosmos_manifest_example.py + :language: python + :start-after: [START aws_s3_example] + :end-before: [END aws_s3_example] + +- GCP GCS URL (available since Cosmos 1.6): + +Ensure that you have the required dependencies installed to use the GCS URL. You can install the required dependencies +using the following command: ``pip install "astronomer-cosmos[google]"`` + +.. literalinclude:: ../../dev/dags/cosmos_manifest_example.py + :language: python + :start-after: [START gcp_gs_example] + :end-before: [END gcp_gs_example] + +- Azure Blob Storage URL (available since Cosmos 1.6): + +Ensure that you have the required dependencies installed to use the Azure blob URL. You can install the required +dependencies using the following command: ``pip install "astronomer-cosmos[microsoft]"`` + +.. literalinclude:: ../../dev/dags/cosmos_manifest_example.py + :language: python + :start-after: [START azure_abfs_example] + :end-before: [END azure_abfs_example] - DbtDag( - project_config=ProjectConfig( - manifest_path="/path/to/manifest.json", - ), - render_config=RenderConfig( - load_method=LoadMode.DBT_MANIFEST, - ), - # ..., - ) ``dbt_ls`` ---------- diff --git a/docs/configuration/project-config.rst b/docs/configuration/project-config.rst index 2882ee9cc3..279d5b3925 100644 --- a/docs/configuration/project-config.rst +++ b/docs/configuration/project-config.rst @@ -12,7 +12,8 @@ variables that should be used for rendering and execution. It takes the followin - ``snapshots_relative_path``: The path to your snapshots directory, relative to the ``dbt_project_path``. This defaults to ``snapshots/`` - ``manifest_path``: The absolute path to your manifests directory. This is only required if you're using Cosmos' manifest - parsing mode + parsing mode. Along with supporting local paths for manifest parsing, starting with Cosmos 1.6.0, if you've + Airflow >= 2.8.0, Cosmos also supports remote paths for manifest parsing(e.g. S3 URL). See :ref:`parsing-methods` for more details. - ``project_name`` : The name of the project. If ``dbt_project_path`` is provided, the ``project_name`` defaults to the folder name containing ``dbt_project.yml``. If ``dbt_project_path`` is not provided, and ``manifest_path`` is provided, ``project_name`` is required as the name can not be inferred from ``dbt_project_path`` diff --git a/docs/requirements.txt b/docs/requirements.txt index 81a7084e4a..70481fe1c7 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,6 +1,9 @@ aenum apache-airflow +apache-airflow-providers-amazon[s3fs]>=3.0.0 apache-airflow-providers-cncf-kubernetes>=5.1.1 +apache-airflow-providers-google +apache-airflow-providers-microsoft-azure google-re2==1.1 msgpack openlineage-airflow diff --git a/pyproject.toml b/pyproject.toml index 9654b37640..a257fa8602 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,7 +66,18 @@ dbt-spark = ["dbt-spark"] dbt-teradata = ["dbt-teradata"] dbt-vertica = ["dbt-vertica<=1.5.4"] openlineage = ["openlineage-integration-common!=1.15.0", "openlineage-airflow"] -all = ["astronomer-cosmos[dbt-all]", "astronomer-cosmos[openlineage]"] +amazon = [ + "apache-airflow-providers-amazon[s3fs]>=3.0.0", +] +google = ["apache-airflow-providers-google"] +microsoft = ["apache-airflow-providers-microsoft-azure"] +all = [ + "astronomer-cosmos[dbt-all]", + "astronomer-cosmos[openlineage]", + "astronomer-cosmos[amazon]", + "astronomer-cosmos[google]", + "astronomer-cosmos[microsoft]", +] docs = [ "sphinx", "pydata-sphinx-theme", @@ -95,7 +106,7 @@ kubernetes = [ "apache-airflow-providers-cncf-kubernetes>=5.1.1", ] aws_eks = [ - "apache-airflow-providers-amazon>=8.0.0,<8.20.0", # https://github.com/apache/airflow/issues/39103 + "apache-airflow-providers-amazon>=8.0.0", ] azure-container-instance = [ "apache-airflow-providers-microsoft-azure>=8.4.0", @@ -129,7 +140,7 @@ packages = ["/cosmos"] dependencies = [ "astronomer-cosmos[tests]", "apache-airflow-providers-cncf-kubernetes>=5.1.1", - "apache-airflow-providers-amazon>=3.0.0,<8.20.0", # https://github.com/apache/airflow/issues/39103 + "apache-airflow-providers-amazon[s3fs]>=3.0.0", "apache-airflow-providers-docker>=3.5.0", "apache-airflow-providers-google", "apache-airflow-providers-microsoft-azure", @@ -178,7 +189,10 @@ markers = ["integration", "sqlite", "perf"] [tool.hatch.envs.docs] dependencies = [ "aenum", + "apache-airflow-providers-amazon[s3fs]>=3.0.0", "apache-airflow-providers-cncf-kubernetes>=5.1.1", + "apache-airflow-providers-google", + "apache-airflow-providers-microsoft-azure", "msgpack", "openlineage-airflow", "pydantic>=1.10.0", diff --git a/tests/test_config.py b/tests/test_config.py index 5bf0f69b56..d557dd4fc2 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -9,6 +9,7 @@ from cosmos.exceptions import CosmosValueError from cosmos.profiles.athena.access_key import AthenaAccessKeyProfileMapping from cosmos.profiles.postgres.user_pass import PostgresUserPasswordProfileMapping +from cosmos.settings import AIRFLOW_IO_AVAILABLE DBT_PROJECTS_ROOT_DIR = Path(__file__).parent / "sample/" SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" @@ -35,7 +36,12 @@ def test_init_with_manifest_path_and_project_path_succeeds(): project_name in this case should be based on dbt_project_path """ project_config = ProjectConfig(dbt_project_path="/tmp/some-path", manifest_path="target/manifest.json") - assert project_config.manifest_path == Path("target/manifest.json") + if AIRFLOW_IO_AVAILABLE: + from airflow.io.path import ObjectStoragePath + + assert project_config.manifest_path == ObjectStoragePath("target/manifest.json") + else: + assert project_config.manifest_path == Path("target/manifest.json") assert project_config.project_name == "some-path" @@ -292,3 +298,36 @@ def test_execution_config_with_invocation_option(execution_mode, invocation_mode def test_execution_config_default_config(execution_mode, expected_invocation_mode): execution_config = ExecutionConfig(execution_mode=execution_mode) assert execution_config.invocation_mode == expected_invocation_mode + + +@pytest.mark.parametrize( + "manifest_path, given_manifest_conn_id, used_manifest_conn_id", + [ + ("s3://cosmos-manifest-test/manifest.json", None, "aws_default"), + ("s3://cosmos-manifest-test/manifest.json", "aws_s3_conn", "aws_s3_conn"), + ("gs://cosmos-manifest-test/manifest.json", None, "google_cloud_default"), + ("gs://cosmos-manifest-test/manifest.json", "gcp_gs_conn", "gcp_gs_conn"), + ("abfs://cosmos-manifest-test/manifest.json", None, "wasb_default"), + ("abfs://cosmos-manifest-test/manifest.json", "azure_abfs_conn", "azure_abfs_conn"), + ], +) +def test_remote_manifest_path(manifest_path, given_manifest_conn_id, used_manifest_conn_id): + if AIRFLOW_IO_AVAILABLE: + project_config = ProjectConfig( + dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id + ) + + from airflow.io.path import ObjectStoragePath + + assert project_config.manifest_path == ObjectStoragePath(manifest_path, conn_id=used_manifest_conn_id) + else: + from airflow.version import version as airflow_version + + error_msg = ( + f"The manifest path {manifest_path} uses a remote file scheme, but the required Object Storage feature is " + f"unavailable in Airflow version {airflow_version}. Please upgrade to Airflow 2.8 or later." + ) + with pytest.raises(CosmosValueError, match=error_msg): + _ = ProjectConfig( + dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id + ) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index af45191c9a..e10093bb5f 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -27,6 +27,7 @@ MIN_VER_DAG_FILE: dict[str, list[str]] = { "2.4": ["cosmos_seed_dag.py"], + "2.8": ["cosmos_manifest_example.py"], } IGNORED_DAG_FILES = ["performance_dag.py"] diff --git a/tests/test_example_dags_no_connections.py b/tests/test_example_dags_no_connections.py index f5b5c0845b..d075a389d4 100644 --- a/tests/test_example_dags_no_connections.py +++ b/tests/test_example_dags_no_connections.py @@ -19,6 +19,7 @@ MIN_VER_DAG_FILE: dict[str, list[str]] = { "2.4": ["cosmos_seed_dag.py"], + "2.8": ["cosmos_manifest_example.py"], } IGNORED_DAG_FILES = ["performance_dag.py"]