diff --git a/cosmos/config.py b/cosmos/config.py index f2c07bb980..0a65d5661c 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -153,6 +153,7 @@ class ProjectConfig: def __init__( self, dbt_project_path: str | Path | None = None, + dbt_project_conn_id: str | None = None, models_relative_path: str | Path = "models", seeds_relative_path: str | Path = "seeds", snapshots_relative_path: str | Path = "snapshots", @@ -173,39 +174,40 @@ def __init__( ) if project_name: self.project_name = project_name - + if manifest_path: + self.manifest_path = self.get_property_from_cloud_or_local(manifest_path, manifest_conn_id) if dbt_project_path: - self.dbt_project_path = Path(dbt_project_path) + self.dbt_project_path = self.get_property_from_cloud_or_local(dbt_project_path, dbt_project_conn_id) self.models_path = self.dbt_project_path / Path(models_relative_path) self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) if not project_name: self.project_name = self.dbt_project_path.stem - if manifest_path: - manifest_path_str = str(manifest_path) - if not manifest_conn_id: - manifest_scheme = manifest_path_str.split("://")[0] - # Use the default Airflow connection ID for the scheme if it is not provided. - manifest_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(manifest_scheme, lambda: None)() + self.env_vars = env_vars + self.dbt_vars = dbt_vars + self.partial_parse = partial_parse - if manifest_conn_id is not None and not AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"The manifest path {manifest_path_str} uses a remote file scheme, but the required Object " - f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - f"Airflow 2.8 or later." - ) + def get_property_from_cloud_or_local(self, property: Path | str, property_conn_id: str | None = None) -> Path: + property_str = str(property) + if not property_conn_id: + scheme = property_str.split("://")[0] + # Use the default Airflow connection ID for the scheme if it is not provided. + property_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(scheme, lambda: None)() - if AIRFLOW_IO_AVAILABLE: - from airflow.io.path import ObjectStoragePath + if property_conn_id is not None and not AIRFLOW_IO_AVAILABLE: + raise CosmosValueError( + f"The path {property_str} uses a remote file scheme, but the required Object " + f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " + f"Airflow 2.8 or later." + ) - self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id) - else: - self.manifest_path = Path(manifest_path_str) + if AIRFLOW_IO_AVAILABLE and property_conn_id: + from airflow.io.path import ObjectStoragePath - self.env_vars = env_vars - self.dbt_vars = dbt_vars - self.partial_parse = partial_parse + return ObjectStoragePath(property_str, conn_id=property_conn_id) + else: + return Path(property_str) def validate_project(self) -> None: """ @@ -227,8 +229,8 @@ def validate_project(self) -> None: if self.dbt_project_path: project_yml_path = self.dbt_project_path / "dbt_project.yml" mandatory_paths = { - "dbt_project.yml": Path(project_yml_path) if project_yml_path else None, - "models directory ": Path(self.models_path) if self.models_path else None, + "dbt_project.yml": project_yml_path, + "models directory": self.models_path, } if self.manifest_path: mandatory_paths["manifest"] = self.manifest_path diff --git a/docs/configuration/project-config.rst b/docs/configuration/project-config.rst index 279d5b3925..7aafe8fbf7 100644 --- a/docs/configuration/project-config.rst +++ b/docs/configuration/project-config.rst @@ -4,7 +4,11 @@ Project Config The ``cosmos.config.ProjectConfig`` allows you to specify information about where your dbt project is located and project variables that should be used for rendering and execution. It takes the following arguments: -- ``dbt_project_path``: The full path to your dbt project. This directory should have a ``dbt_project.yml`` file +- ``dbt_project_path``: The full path to your dbt project. This directory should have a ``dbt_project.yml`` file. + Along with supporting local paths, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, Cosmos also supports + remote paths for your dbt project (e.g. S3 URL). +- ``dbt_project_conn_id``: The connection id for the Airflow connection that contains the credentials for the remote + dbt project. This is only required if you're using a remote dbt project path. - ``models_relative_path``: The path to your models directory, relative to the ``dbt_project_path``. This defaults to ``models/`` - ``seeds_relative_path``: The path to your seeds directory, relative to the ``dbt_project_path``. This defaults to @@ -14,6 +18,8 @@ variables that should be used for rendering and execution. It takes the followin - ``manifest_path``: The absolute path to your manifests directory. This is only required if you're using Cosmos' manifest parsing mode. Along with supporting local paths for manifest parsing, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, Cosmos also supports remote paths for manifest parsing(e.g. S3 URL). See :ref:`parsing-methods` for more details. +- ``manifest_conn_id``: The connection id for the Airflow connection that contains the credentials for the remote + manifest path. This is only required if you're using a remote manifest path. - ``project_name`` : The name of the project. If ``dbt_project_path`` is provided, the ``project_name`` defaults to the folder name containing ``dbt_project.yml``. If ``dbt_project_path`` is not provided, and ``manifest_path`` is provided, ``project_name`` is required as the name can not be inferred from ``dbt_project_path`` diff --git a/tests/test_config.py b/tests/test_config.py index d557dd4fc2..11c2683c3d 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -36,12 +36,8 @@ def test_init_with_manifest_path_and_project_path_succeeds(): project_name in this case should be based on dbt_project_path """ project_config = ProjectConfig(dbt_project_path="/tmp/some-path", manifest_path="target/manifest.json") - if AIRFLOW_IO_AVAILABLE: - from airflow.io.path import ObjectStoragePath - - assert project_config.manifest_path == ObjectStoragePath("target/manifest.json") - else: - assert project_config.manifest_path == Path("target/manifest.json") + assert project_config.manifest_path == Path("target/manifest.json") + assert project_config.dbt_project_path == Path("/tmp/some-path") assert project_config.project_name == "some-path" @@ -324,10 +320,49 @@ def test_remote_manifest_path(manifest_path, given_manifest_conn_id, used_manife from airflow.version import version as airflow_version error_msg = ( - f"The manifest path {manifest_path} uses a remote file scheme, but the required Object Storage feature is " + f"The path {manifest_path} uses a remote file scheme, but the required Object Storage feature is " f"unavailable in Airflow version {airflow_version}. Please upgrade to Airflow 2.8 or later." ) with pytest.raises(CosmosValueError, match=error_msg): _ = ProjectConfig( dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id ) + + +@pytest.mark.parametrize( + "dbt_project_path, given_dbt_project_conn_id, used_dbt_project_conn_id, project_name", + [ + ("s3://cosmos-dbt-project-test/test-project", None, "aws_default", "custom-project-name"), + ("s3://cosmos-dbt-project-test/test-project", "aws_s3_conn", "aws_s3_conn", None), + ("gs://cosmos-dbt-project-test/test-project", None, "google_cloud_default", "custom-project-name"), + ("gs://cosmos-dbt-project-test/test-project", "gcp_gs_conn", "gcp_gs_conn", None), + ("abfs://cosmos-dbt-project-test/test-project", None, "wasb_default", "custom-project-name"), + ("abfs://cosmos-dbt-project-test/test-project", "azure_abfs_conn", "azure_abfs_conn", None), + ], +) +def test_remote_dbt_project_path(dbt_project_path, given_dbt_project_conn_id, used_dbt_project_conn_id, project_name): + if AIRFLOW_IO_AVAILABLE: + project_config = ProjectConfig( + dbt_project_path=dbt_project_path, + dbt_project_conn_id=given_dbt_project_conn_id, + manifest_path="/some/manifest.json", + project_name=project_name, + ) + + from airflow.io.path import ObjectStoragePath + + assert project_config.dbt_project_path == ObjectStoragePath(dbt_project_path, conn_id=used_dbt_project_conn_id) + assert project_config.project_name == project_name if project_name else "test-project" + else: + from airflow.version import version as airflow_version + + error_msg = ( + f"The path {dbt_project_path} uses a remote file scheme, but the required Object Storage feature is " + f"unavailable in Airflow version {airflow_version}. Please upgrade to Airflow 2.8 or later." + ) + with pytest.raises(CosmosValueError, match=error_msg): + _ = ProjectConfig( + dbt_project_path=dbt_project_path, + dbt_project_conn_id=given_dbt_project_conn_id, + manifest_path="/some/manifest.json", + )