From e92f6323ff58a963d668417ba2489f3089c4cef3 Mon Sep 17 00:00:00 2001 From: GiovanniCorsetti <66136602+CorsettiS@users.noreply.github.com> Date: Fri, 9 Aug 2024 20:14:47 +0200 Subject: [PATCH 01/16] support dbt project remote --- cosmos/config.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 62557de638..d7936f606c 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -150,6 +150,7 @@ class ProjectConfig: def __init__( self, dbt_project_path: str | Path | None = None, + dbt_project_conn_id: str | None = None, models_relative_path: str | Path = "models", seeds_relative_path: str | Path = "seeds", snapshots_relative_path: str | Path = "snapshots", @@ -172,10 +173,24 @@ def __init__( self.project_name = project_name if dbt_project_path: - self.dbt_project_path = Path(dbt_project_path) - self.models_path = self.dbt_project_path / Path(models_relative_path) - self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) - self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) + dbt_project_path_str = str(dbt_project_path) + if not dbt_project_conn_id: + self.dbt_project_path = Path(dbt_project_path) + self.models_path = self.dbt_project_path / Path(models_relative_path) + self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) + self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) + elif dbt_project_conn_id is not None and not AIRFLOW_IO_AVAILABLE: + raise CosmosValueError( + f"The dbt project path {dbt_project_path_str} uses a remote file scheme, but the required Object " + f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " + f"Airflow 2.8 or later." + ) + elif AIRFLOW_IO_AVAILABLE: + from airflow.io.path import ObjectStoragePath + + self.dbt_project_path = ObjectStoragePath(dbt_project_path_str, conn_id=manifest_conn_id) + else: + self.dbt_project_path = Path(dbt_project_path_str) if not project_name: self.project_name = self.dbt_project_path.stem From 151b4bb7a57958beec7ff70275a8fbbe257f34dd Mon Sep 17 00:00:00 2001 From: GiovanniCorsetti <66136602+CorsettiS@users.noreply.github.com> Date: Fri, 9 Aug 2024 20:34:05 +0200 Subject: [PATCH 02/16] Update config.py --- cosmos/config.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cosmos/config.py b/cosmos/config.py index d7936f606c..de4b44bfb9 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -187,8 +187,11 @@ def __init__( ) elif AIRFLOW_IO_AVAILABLE: from airflow.io.path import ObjectStoragePath - self.dbt_project_path = ObjectStoragePath(dbt_project_path_str, conn_id=manifest_conn_id) + import os + dir_list=os.listdir(self.dbt_project_path) + raise CosmoValueError(f"we found that at dir_list : {dir_list}") + else: self.dbt_project_path = Path(dbt_project_path_str) if not project_name: From 1f9878afdf46e672be042404c9d852a6e38605d2 Mon Sep 17 00:00:00 2001 From: GiovanniCorsetti <66136602+CorsettiS@users.noreply.github.com> Date: Fri, 9 Aug 2024 20:41:16 +0200 Subject: [PATCH 03/16] Update config.py --- cosmos/config.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index de4b44bfb9..5847e1dc88 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -188,12 +188,14 @@ def __init__( elif AIRFLOW_IO_AVAILABLE: from airflow.io.path import ObjectStoragePath self.dbt_project_path = ObjectStoragePath(dbt_project_path_str, conn_id=manifest_conn_id) - import os - dir_list=os.listdir(self.dbt_project_path) - raise CosmoValueError(f"we found that at dir_list : {dir_list}") - + self.models_path = self.dbt_project_path / Path(models_relative_path) + self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) + self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) else: self.dbt_project_path = Path(dbt_project_path_str) + self.models_path = self.dbt_project_path / Path(models_relative_path) + self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) + self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) if not project_name: self.project_name = self.dbt_project_path.stem From 70640471d7f7b64f3476a949e773bd30cc31f6dd Mon Sep 17 00:00:00 2001 From: GiovanniCorsetti <66136602+CorsettiS@users.noreply.github.com> Date: Fri, 9 Aug 2024 20:46:25 +0200 Subject: [PATCH 04/16] comment mandatory fields for now --- cosmos/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/config.py b/cosmos/config.py index 5847e1dc88..22daa498e0 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -244,7 +244,7 @@ def validate_project(self) -> None: if self.dbt_project_path: project_yml_path = self.dbt_project_path / "dbt_project.yml" mandatory_paths = { - "dbt_project.yml": Path(project_yml_path) if project_yml_path else None, + # "dbt_project.yml": Path(project_yml_path) if project_yml_path else None, "models directory ": Path(self.models_path) if self.models_path else None, } if self.manifest_path: From 40a253a529324bb883bf29f98c0ec1831c031f36 Mon Sep 17 00:00:00 2001 From: GiovanniCorsetti <66136602+CorsettiS@users.noreply.github.com> Date: Fri, 9 Aug 2024 20:49:33 +0200 Subject: [PATCH 05/16] simplify logic --- cosmos/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 22daa498e0..f478a58bd8 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -244,8 +244,8 @@ def validate_project(self) -> None: if self.dbt_project_path: project_yml_path = self.dbt_project_path / "dbt_project.yml" mandatory_paths = { - # "dbt_project.yml": Path(project_yml_path) if project_yml_path else None, - "models directory ": Path(self.models_path) if self.models_path else None, + "dbt_project.yml": project_yml_path else None, + "models directory ": self.models_path else None, } if self.manifest_path: mandatory_paths["manifest"] = self.manifest_path From 8cde3a8c903b7922df6ccf8525990fcaecc2fac2 Mon Sep 17 00:00:00 2001 From: GiovanniCorsetti <66136602+CorsettiS@users.noreply.github.com> Date: Fri, 9 Aug 2024 20:51:56 +0200 Subject: [PATCH 06/16] Update config.py --- cosmos/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index f478a58bd8..29b1c84635 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -244,8 +244,8 @@ def validate_project(self) -> None: if self.dbt_project_path: project_yml_path = self.dbt_project_path / "dbt_project.yml" mandatory_paths = { - "dbt_project.yml": project_yml_path else None, - "models directory ": self.models_path else None, + "dbt_project.yml": project_yml_path, + "models directory": self.models_path, } if self.manifest_path: mandatory_paths["manifest"] = self.manifest_path From c303d9b073faace81b39b7273c50fab3a1629c81 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 9 Aug 2024 19:00:31 +0000 Subject: [PATCH 07/16] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cosmos/config.py b/cosmos/config.py index 29b1c84635..80f0c670e7 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -187,6 +187,7 @@ def __init__( ) elif AIRFLOW_IO_AVAILABLE: from airflow.io.path import ObjectStoragePath + self.dbt_project_path = ObjectStoragePath(dbt_project_path_str, conn_id=manifest_conn_id) self.models_path = self.dbt_project_path / Path(models_relative_path) self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) From 979e13db08587907cd7c6e19ff87e3046fd375cf Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Sat, 10 Aug 2024 12:05:12 +0200 Subject: [PATCH 08/16] simplify code --- cosmos/config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cosmos/config.py b/cosmos/config.py index 80f0c670e7..29b1c84635 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -187,7 +187,6 @@ def __init__( ) elif AIRFLOW_IO_AVAILABLE: from airflow.io.path import ObjectStoragePath - self.dbt_project_path = ObjectStoragePath(dbt_project_path_str, conn_id=manifest_conn_id) self.models_path = self.dbt_project_path / Path(models_relative_path) self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) From d9cf68256173baf55353464d9c38d2cd51b007a7 Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Sat, 10 Aug 2024 13:25:56 +0200 Subject: [PATCH 09/16] simplify code complexity --- cosmos/config.py | 102 ++++++++++++++++++++++++++++------------------- 1 file changed, 62 insertions(+), 40 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 29b1c84635..d82d123e1f 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -172,43 +172,69 @@ def __init__( if project_name: self.project_name = project_name - if dbt_project_path: - dbt_project_path_str = str(dbt_project_path) - if not dbt_project_conn_id: - self.dbt_project_path = Path(dbt_project_path) - self.models_path = self.dbt_project_path / Path(models_relative_path) - self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) - self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) - elif dbt_project_conn_id is not None and not AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"The dbt project path {dbt_project_path_str} uses a remote file scheme, but the required Object " - f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - f"Airflow 2.8 or later." - ) - elif AIRFLOW_IO_AVAILABLE: - from airflow.io.path import ObjectStoragePath - self.dbt_project_path = ObjectStoragePath(dbt_project_path_str, conn_id=manifest_conn_id) - self.models_path = self.dbt_project_path / Path(models_relative_path) - self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) - self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) - else: - self.dbt_project_path = Path(dbt_project_path_str) - self.models_path = self.dbt_project_path / Path(models_relative_path) - self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) - self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) - if not project_name: - self.project_name = self.dbt_project_path.stem - - if manifest_path: - manifest_path_str = str(manifest_path) - if not manifest_conn_id: - manifest_scheme = manifest_path_str.split("://")[0] + # if dbt_project_path: + # dbt_project_path_str = str(dbt_project_path) + # if not dbt_project_conn_id: + # self.dbt_project_path = Path(dbt_project_path) + # elif AIRFLOW_IO_AVAILABLE: + # from airflow.io.path import ObjectStoragePath + # self.dbt_project_path = ObjectStoragePath(str(dbt_project_path), conn_id=dbt_project_conn_id) + # else: + # raise CosmosValueError( + # f"The dbt project path {dbt_project_path_str} uses a remote file scheme, but the required Object " + # f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " + # f"Airflow 2.8 or later." + # ) + # self.models_path = self.dbt_project_path / Path(models_relative_path) + # self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) + # self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) + # if not project_name: + # self.project_name = self.dbt_project_path.stem + + # if manifest_path: + # manifest_path_str = str(manifest_path) + # if not manifest_conn_id: + # manifest_scheme = manifest_path_str.split("://")[0] + # # Use the default Airflow connection ID for the scheme if it is not provided. + # manifest_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(manifest_scheme, lambda: None)() + # + # if manifest_conn_id is not None and not AIRFLOW_IO_AVAILABLE: + # raise CosmosValueError( + # f"The manifest path {manifest_path_str} uses a remote file scheme, but the required Object " + # f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " + # f"Airflow 2.8 or later." + # ) + # + # if AIRFLOW_IO_AVAILABLE: + # from airflow.io.path import ObjectStoragePath + # + # self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id) + # else: + # self.manifest_path = Path(manifest_path_str) + + self.manifest_path = self.get_property_from_cloud(manifest_path, manifest_conn_id) + self.dbt_project_path = self.get_property_from_cloud(dbt_project_path, dbt_project_conn_id) + self.models_path = self.dbt_project_path / Path(models_relative_path) + self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) + self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) + if not project_name: + self.project_name = self.dbt_project_path.stem + + self.env_vars = env_vars + self.dbt_vars = dbt_vars + self.partial_parse = partial_parse + + def get_property_from_cloud(self, property: Path | str, property_conn_id: str | None = None) -> Path: + if property: + property_str = str(property) + if not property_conn_id: + scheme = property_str.split("://")[0] # Use the default Airflow connection ID for the scheme if it is not provided. - manifest_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(manifest_scheme, lambda: None)() + property_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(scheme, lambda: None)() - if manifest_conn_id is not None and not AIRFLOW_IO_AVAILABLE: + if property_conn_id is not None and not AIRFLOW_IO_AVAILABLE: raise CosmosValueError( - f"The manifest path {manifest_path_str} uses a remote file scheme, but the required Object " + f"The path {property_str} uses a remote file scheme, but the required Object " f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " f"Airflow 2.8 or later." ) @@ -216,13 +242,9 @@ def __init__( if AIRFLOW_IO_AVAILABLE: from airflow.io.path import ObjectStoragePath - self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id) + return ObjectStoragePath(property_str, conn_id=property_conn_id) else: - self.manifest_path = Path(manifest_path_str) - - self.env_vars = env_vars - self.dbt_vars = dbt_vars - self.partial_parse = partial_parse + return Path(property_str) def validate_project(self) -> None: """ From 5b7a7ca1757f91d67832719fb5282f3dd8a0b532 Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Sat, 10 Aug 2024 13:28:13 +0200 Subject: [PATCH 10/16] simplify project_name --- cosmos/config.py | 47 ++++------------------------------------------- 1 file changed, 4 insertions(+), 43 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index d82d123e1f..795821cb24 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -169,55 +169,16 @@ def __init__( raise CosmosValueError( "If ProjectConfig.dbt_project_path is not defined, ProjectConfig.manifest_path and ProjectConfig.project_name must be defined together, or both left undefined." ) - if project_name: - self.project_name = project_name - - # if dbt_project_path: - # dbt_project_path_str = str(dbt_project_path) - # if not dbt_project_conn_id: - # self.dbt_project_path = Path(dbt_project_path) - # elif AIRFLOW_IO_AVAILABLE: - # from airflow.io.path import ObjectStoragePath - # self.dbt_project_path = ObjectStoragePath(str(dbt_project_path), conn_id=dbt_project_conn_id) - # else: - # raise CosmosValueError( - # f"The dbt project path {dbt_project_path_str} uses a remote file scheme, but the required Object " - # f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - # f"Airflow 2.8 or later." - # ) - # self.models_path = self.dbt_project_path / Path(models_relative_path) - # self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) - # self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) - # if not project_name: - # self.project_name = self.dbt_project_path.stem - - # if manifest_path: - # manifest_path_str = str(manifest_path) - # if not manifest_conn_id: - # manifest_scheme = manifest_path_str.split("://")[0] - # # Use the default Airflow connection ID for the scheme if it is not provided. - # manifest_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(manifest_scheme, lambda: None)() - # - # if manifest_conn_id is not None and not AIRFLOW_IO_AVAILABLE: - # raise CosmosValueError( - # f"The manifest path {manifest_path_str} uses a remote file scheme, but the required Object " - # f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - # f"Airflow 2.8 or later." - # ) - # - # if AIRFLOW_IO_AVAILABLE: - # from airflow.io.path import ObjectStoragePath - # - # self.manifest_path = ObjectStoragePath(manifest_path_str, conn_id=manifest_conn_id) - # else: - # self.manifest_path = Path(manifest_path_str) self.manifest_path = self.get_property_from_cloud(manifest_path, manifest_conn_id) self.dbt_project_path = self.get_property_from_cloud(dbt_project_path, dbt_project_conn_id) self.models_path = self.dbt_project_path / Path(models_relative_path) self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) - if not project_name: + + if project_name: + self.project_name = project_name + else: self.project_name = self.dbt_project_path.stem self.env_vars = env_vars From ed6d9aa1fc560287947c48fd99a660dcf5e0a505 Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Sat, 10 Aug 2024 14:14:10 +0200 Subject: [PATCH 11/16] add tests --- tests/test_config.py | 44 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/tests/test_config.py b/tests/test_config.py index d557dd4fc2..8b1c79a948 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -40,8 +40,11 @@ def test_init_with_manifest_path_and_project_path_succeeds(): from airflow.io.path import ObjectStoragePath assert project_config.manifest_path == ObjectStoragePath("target/manifest.json") + assert project_config.dbt_project_path == ObjectStoragePath("/tmp/some-path") else: assert project_config.manifest_path == Path("target/manifest.json") + assert project_config.dbt_project_path == Path("/tmp/some-path") + assert project_config.project_name == "some-path" @@ -324,10 +327,49 @@ def test_remote_manifest_path(manifest_path, given_manifest_conn_id, used_manife from airflow.version import version as airflow_version error_msg = ( - f"The manifest path {manifest_path} uses a remote file scheme, but the required Object Storage feature is " + f"The path {manifest_path} uses a remote file scheme, but the required Object Storage feature is " f"unavailable in Airflow version {airflow_version}. Please upgrade to Airflow 2.8 or later." ) with pytest.raises(CosmosValueError, match=error_msg): _ = ProjectConfig( dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id ) + + +@pytest.mark.parametrize( + "dbt_project_path, given_dbt_project_conn_id, used_dbt_project_conn_id, project_name", + [ + ("s3://cosmos-dbt-project-test/test-project", None, "aws_default", "custom-project-name"), + ("s3://cosmos-dbt-project-test/test-project", "aws_s3_conn", "aws_s3_conn", None), + ("gs://cosmos-dbt-project-test/test-project", None, "google_cloud_default", "custom-project-name"), + ("gs://cosmos-dbt-project-test/test-project", "gcp_gs_conn", "gcp_gs_conn", None), + ("abfs://cosmos-dbt-project-test/test-project", None, "wasb_default", "custom-project-name"), + ("abfs://cosmos-dbt-project-test/test-project", "azure_abfs_conn", "azure_abfs_conn", None), + ], +) +def test_remote_dbt_project_path(dbt_project_path, given_dbt_project_conn_id, used_dbt_project_conn_id, project_name): + if AIRFLOW_IO_AVAILABLE: + project_config = ProjectConfig( + dbt_project_path=dbt_project_path, + dbt_project_conn_id=given_dbt_project_conn_id, + manifest_path="/some/manifest.json", + project_name=project_name, + ) + + from airflow.io.path import ObjectStoragePath + + assert project_config.dbt_project_path == ObjectStoragePath(dbt_project_path, conn_id=used_dbt_project_conn_id) + assert project_config.project_name == project_name if project_name else "test-project" + else: + from airflow.version import version as airflow_version + + error_msg = ( + f"The path {dbt_project_path} uses a remote file scheme, but the required Object Storage feature is " + f"unavailable in Airflow version {airflow_version}. Please upgrade to Airflow 2.8 or later." + ) + with pytest.raises(CosmosValueError, match=error_msg): + _ = ProjectConfig( + dbt_project_path=dbt_project_path, + dbt_project_conn_id=given_dbt_project_conn_id, + manifest_path="/some/manifest.json", + ) From ad118b8c034d291ca27b4ebb7ec3374408441880 Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Sat, 10 Aug 2024 14:19:01 +0200 Subject: [PATCH 12/16] fix typo --- cosmos/config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 795821cb24..85032dbebe 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -170,8 +170,8 @@ def __init__( "If ProjectConfig.dbt_project_path is not defined, ProjectConfig.manifest_path and ProjectConfig.project_name must be defined together, or both left undefined." ) - self.manifest_path = self.get_property_from_cloud(manifest_path, manifest_conn_id) - self.dbt_project_path = self.get_property_from_cloud(dbt_project_path, dbt_project_conn_id) + self.manifest_path = self.get_property_from_cloud_or_local(manifest_path, manifest_conn_id) + self.dbt_project_path = self.get_property_from_cloud_or_local(dbt_project_path, dbt_project_conn_id) self.models_path = self.dbt_project_path / Path(models_relative_path) self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) @@ -185,7 +185,7 @@ def __init__( self.dbt_vars = dbt_vars self.partial_parse = partial_parse - def get_property_from_cloud(self, property: Path | str, property_conn_id: str | None = None) -> Path: + def get_property_from_cloud_or_local(self, property: Path | str, property_conn_id: str | None = None) -> Path: if property: property_str = str(property) if not property_conn_id: From b1634c3619aecfcb370ef4f470508d356cc2180f Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Sat, 10 Aug 2024 14:25:02 +0200 Subject: [PATCH 13/16] add documentation --- docs/configuration/project-config.rst | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/configuration/project-config.rst b/docs/configuration/project-config.rst index 279d5b3925..621924e3d3 100644 --- a/docs/configuration/project-config.rst +++ b/docs/configuration/project-config.rst @@ -4,7 +4,11 @@ Project Config The ``cosmos.config.ProjectConfig`` allows you to specify information about where your dbt project is located and project variables that should be used for rendering and execution. It takes the following arguments: -- ``dbt_project_path``: The full path to your dbt project. This directory should have a ``dbt_project.yml`` file +- ``dbt_project_path``: The full path to your dbt project. This directory should have a ``dbt_project.yml`` file. + Along with supporting local paths, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, Cosmos also supports + remote paths for your dbt project (e.g. S3 URL). For that you need to provide the `dbt_project_conn_id` argument. +- ``dbt_project_conn_id``: The connection id for the Airflow connection that contains the credentials for the remote + dbt project. This is only required if you're using a remote dbt project path. - ``models_relative_path``: The path to your models directory, relative to the ``dbt_project_path``. This defaults to ``models/`` - ``seeds_relative_path``: The path to your seeds directory, relative to the ``dbt_project_path``. This defaults to @@ -14,6 +18,8 @@ variables that should be used for rendering and execution. It takes the followin - ``manifest_path``: The absolute path to your manifests directory. This is only required if you're using Cosmos' manifest parsing mode. Along with supporting local paths for manifest parsing, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, Cosmos also supports remote paths for manifest parsing(e.g. S3 URL). See :ref:`parsing-methods` for more details. +- ``manifest_conn_id``: The connection id for the Airflow connection that contains the credentials for the remote + manifest path. This is only required if you're using a remote manifest path. - ``project_name`` : The name of the project. If ``dbt_project_path`` is provided, the ``project_name`` defaults to the folder name containing ``dbt_project.yml``. If ``dbt_project_path`` is not provided, and ``manifest_path`` is provided, ``project_name`` is required as the name can not be inferred from ``dbt_project_path`` From 013d5c9ffb926096986b78c1a40cd2df0f73184b Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Sat, 10 Aug 2024 14:37:00 +0200 Subject: [PATCH 14/16] simplify doc --- docs/configuration/project-config.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/project-config.rst b/docs/configuration/project-config.rst index 621924e3d3..7aafe8fbf7 100644 --- a/docs/configuration/project-config.rst +++ b/docs/configuration/project-config.rst @@ -6,7 +6,7 @@ variables that should be used for rendering and execution. It takes the followin - ``dbt_project_path``: The full path to your dbt project. This directory should have a ``dbt_project.yml`` file. Along with supporting local paths, starting with Cosmos 1.6.0, if you've Airflow >= 2.8.0, Cosmos also supports - remote paths for your dbt project (e.g. S3 URL). For that you need to provide the `dbt_project_conn_id` argument. + remote paths for your dbt project (e.g. S3 URL). - ``dbt_project_conn_id``: The connection id for the Airflow connection that contains the credentials for the remote dbt project. This is only required if you're using a remote dbt project path. - ``models_relative_path``: The path to your models directory, relative to the ``dbt_project_path``. This defaults to From 8c4ec34b86a92453dfa5487f3e1147dbcbf607ef Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Sat, 10 Aug 2024 14:46:33 +0200 Subject: [PATCH 15/16] fix logic in code --- cosmos/config.py | 53 ++++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 85032dbebe..3308ed08f8 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -169,43 +169,42 @@ def __init__( raise CosmosValueError( "If ProjectConfig.dbt_project_path is not defined, ProjectConfig.manifest_path and ProjectConfig.project_name must be defined together, or both left undefined." ) - - self.manifest_path = self.get_property_from_cloud_or_local(manifest_path, manifest_conn_id) - self.dbt_project_path = self.get_property_from_cloud_or_local(dbt_project_path, dbt_project_conn_id) - self.models_path = self.dbt_project_path / Path(models_relative_path) - self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) - self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) - if project_name: self.project_name = project_name - else: - self.project_name = self.dbt_project_path.stem + if manifest_path: + self.manifest_path = self.get_property_from_cloud_or_local(manifest_path, manifest_conn_id) + if dbt_project_path: + self.dbt_project_path = self.get_property_from_cloud_or_local(dbt_project_path, dbt_project_conn_id) + self.models_path = self.dbt_project_path / Path(models_relative_path) + self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) + self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) + if not project_name: + self.project_name = self.dbt_project_path.stem self.env_vars = env_vars self.dbt_vars = dbt_vars self.partial_parse = partial_parse def get_property_from_cloud_or_local(self, property: Path | str, property_conn_id: str | None = None) -> Path: - if property: - property_str = str(property) - if not property_conn_id: - scheme = property_str.split("://")[0] - # Use the default Airflow connection ID for the scheme if it is not provided. - property_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(scheme, lambda: None)() - - if property_conn_id is not None and not AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"The path {property_str} uses a remote file scheme, but the required Object " - f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - f"Airflow 2.8 or later." - ) + property_str = str(property) + if not property_conn_id: + scheme = property_str.split("://")[0] + # Use the default Airflow connection ID for the scheme if it is not provided. + property_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(scheme, lambda: None)() - if AIRFLOW_IO_AVAILABLE: - from airflow.io.path import ObjectStoragePath + if property_conn_id is not None and not AIRFLOW_IO_AVAILABLE: + raise CosmosValueError( + f"The path {property_str} uses a remote file scheme, but the required Object " + f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " + f"Airflow 2.8 or later." + ) - return ObjectStoragePath(property_str, conn_id=property_conn_id) - else: - return Path(property_str) + if AIRFLOW_IO_AVAILABLE: + from airflow.io.path import ObjectStoragePath + + return ObjectStoragePath(property_str, conn_id=property_conn_id) + else: + return Path(property_str) def validate_project(self) -> None: """ From 511fb5297e555407378f88de125190132be29bfb Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Sat, 10 Aug 2024 15:11:39 +0200 Subject: [PATCH 16/16] fix wrong tests --- cosmos/config.py | 2 +- tests/test_config.py | 11 ++--------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 3308ed08f8..d53ee3b341 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -199,7 +199,7 @@ def get_property_from_cloud_or_local(self, property: Path | str, property_conn_i f"Airflow 2.8 or later." ) - if AIRFLOW_IO_AVAILABLE: + if AIRFLOW_IO_AVAILABLE and property_conn_id: from airflow.io.path import ObjectStoragePath return ObjectStoragePath(property_str, conn_id=property_conn_id) diff --git a/tests/test_config.py b/tests/test_config.py index 8b1c79a948..11c2683c3d 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -36,15 +36,8 @@ def test_init_with_manifest_path_and_project_path_succeeds(): project_name in this case should be based on dbt_project_path """ project_config = ProjectConfig(dbt_project_path="/tmp/some-path", manifest_path="target/manifest.json") - if AIRFLOW_IO_AVAILABLE: - from airflow.io.path import ObjectStoragePath - - assert project_config.manifest_path == ObjectStoragePath("target/manifest.json") - assert project_config.dbt_project_path == ObjectStoragePath("/tmp/some-path") - else: - assert project_config.manifest_path == Path("target/manifest.json") - assert project_config.dbt_project_path == Path("/tmp/some-path") - + assert project_config.manifest_path == Path("target/manifest.json") + assert project_config.dbt_project_path == Path("/tmp/some-path") assert project_config.project_name == "some-path"