From bac6c3ded2fd78256ecfede7e9056772f8451883 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Oct 2025 16:12:53 +0100 Subject: [PATCH 01/12] Fix mypy issues --- cosmos/cache.py | 11 +++++++++-- cosmos/config.py | 22 +++++++++++++++------- cosmos/dbt/graph.py | 8 +++++++- cosmos/operators/local.py | 9 +++++++-- 4 files changed, 38 insertions(+), 12 deletions(-) diff --git a/cosmos/cache.py b/cosmos/cache.py index ed17725bd8..7488c216ff 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -10,6 +10,7 @@ from collections import defaultdict from datetime import datetime, timedelta, timezone from pathlib import Path +from typing import TYPE_CHECKING import msgpack import yaml @@ -22,6 +23,12 @@ from sqlalchemy.orm import Session from cosmos import settings + +if TYPE_CHECKING: + try: + from airflow.io.path import ObjectStoragePath + except ImportError: + pass from cosmos.constants import ( DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME, @@ -48,12 +55,12 @@ VAR_KEY_CACHE_PREFIX = "cosmos_cache__" -def _configure_remote_cache_dir() -> Path | None: +def _configure_remote_cache_dir() -> Path | ObjectStoragePath | None: """Configure the remote cache dir if it is provided.""" if not settings_remote_cache_dir: return None - _configured_cache_dir: Path | None = None + _configured_cache_dir: Path | ObjectStoragePath | None = None cache_dir_str = str(settings_remote_cache_dir) diff --git a/cosmos/config.py b/cosmos/config.py index e1a4a0c353..5f4d540d9c 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -8,12 +8,18 @@ import warnings from dataclasses import InitVar, dataclass, field from pathlib import Path -from typing import Any, Callable, Iterator +from typing import TYPE_CHECKING, Any, Callable, Iterator import yaml from airflow.version import version as airflow_version from cosmos import settings + +if TYPE_CHECKING: + try: + from airflow.io.path import ObjectStoragePath + except ImportError: + pass from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled from cosmos.constants import ( DEFAULT_PROFILES_FILE_NAME, @@ -173,7 +179,7 @@ class ProjectConfig: dbt_project_path: Path | None = None install_dbt_deps: bool = True copy_dbt_packages: bool = settings.default_copy_dbt_packages - manifest_path: Path | None = None + manifest_path: Path | ObjectStoragePath | None = None models_path: Path | None = None seeds_path: Path | None = None snapshots_path: Path | None = None @@ -250,7 +256,7 @@ def validate_project(self) -> None: If the project path is not provided, we have a scenario 2 """ - mandatory_paths = {} + mandatory_paths: dict[str, Path | ObjectStoragePath | None] = {} # We validate the existence of paths added to the `mandatory_paths` map by calling the `exists()` method on each # one. Starting with Cosmos 1.6.0, if the Airflow version is `>= 2.8.0` and a `manifest_path` is provided, we # cast it to an `airflow.io.path.ObjectStoragePath` instance during `ProjectConfig` initialisation, and it @@ -259,10 +265,12 @@ def validate_project(self) -> None: # map works correctly for all paths, thereby validating the project. if self.dbt_project_path: project_yml_path = self.dbt_project_path / "dbt_project.yml" - mandatory_paths = { - "dbt_project.yml": Path(project_yml_path) if project_yml_path else None, - "models directory ": Path(self.models_path) if self.models_path else None, - } + mandatory_paths.update( + { + "dbt_project.yml": Path(project_yml_path) if project_yml_path else None, + "models directory ": Path(self.models_path) if self.models_path else None, + } + ) if self.manifest_path: mandatory_paths["manifest"] = self.manifest_path diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index ac17a02692..813da01951 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -18,6 +18,12 @@ from airflow.models import Variable +if TYPE_CHECKING: + try: + from airflow.io.path import ObjectStoragePath + except ImportError: + pass + import cosmos.dbt.runner as dbt_runner from cosmos import cache, settings from cosmos.cache import ( @@ -477,7 +483,7 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None: else: Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True) - def _get_dbt_ls_remote_cache(self, remote_cache_dir: Path) -> dict[str, str]: + def _get_dbt_ls_remote_cache(self, remote_cache_dir: Path | ObjectStoragePath) -> dict[str, str]: """Loads the remote cache for dbt ls.""" cache_dict: dict[str, str] = {} remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json" diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index a68cea8ab8..a8a41a6edc 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -26,6 +26,11 @@ from airflow.sdk.definitions.context import Context except ImportError: from airflow.utils.context import Context # type: ignore[attr-defined] + + try: + from airflow.io.path import ObjectStoragePath + except ImportError: + pass from airflow.version import version as airflow_version from attrs import define from packaging.version import Version @@ -289,7 +294,7 @@ def store_compiled_sql(self, tmp_project_dir: str, context: Context) -> None: self.compiled_sql = self.compiled_sql.strip() @staticmethod - def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]: + def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tuple[None, None]: """Configure the remote target path if it is provided.""" if not remote_target_path: return None, None @@ -325,7 +330,7 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]: return _configured_target_path, remote_conn_id def _construct_dest_file_path( - self, dest_target_dir: Path, file_path: str, source_compiled_dir: Path, resource_type: str + self, dest_target_dir: Path | ObjectStoragePath, file_path: str, source_compiled_dir: Path, resource_type: str ) -> str: """ Construct the destination path for the compiled SQL files to be uploaded to the remote store. From bb60d5ae65c5317fce5309c5621db55ffcda3fa8 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Oct 2025 16:32:48 +0100 Subject: [PATCH 02/12] Update cosmos/operators/local.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cosmos/operators/local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index a8a41a6edc..e18568b0d0 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -294,7 +294,7 @@ def store_compiled_sql(self, tmp_project_dir: str, context: Context) -> None: self.compiled_sql = self.compiled_sql.strip() @staticmethod - def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tuple[None, None]: + def _configure_remote_target_path() -> tuple[Path | "ObjectStoragePath", str] | tuple[None, None]: """Configure the remote target path if it is provided.""" if not remote_target_path: return None, None From 3ae39210222a235c237e0776d7a10598baa4f0af Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Oct 2025 16:32:57 +0100 Subject: [PATCH 03/12] Update cosmos/operators/local.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cosmos/operators/local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index e18568b0d0..b3088d877f 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -330,7 +330,7 @@ def _configure_remote_target_path() -> tuple[Path | "ObjectStoragePath", str] | return _configured_target_path, remote_conn_id def _construct_dest_file_path( - self, dest_target_dir: Path | ObjectStoragePath, file_path: str, source_compiled_dir: Path, resource_type: str + self, dest_target_dir: Path | "ObjectStoragePath", file_path: str, source_compiled_dir: Path, resource_type: str ) -> str: """ Construct the destination path for the compiled SQL files to be uploaded to the remote store. From 49fff6a888b33daaaf06a18c6893ab85eb99e0ab Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Oct 2025 16:33:03 +0100 Subject: [PATCH 04/12] Update cosmos/config.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cosmos/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/config.py b/cosmos/config.py index 5f4d540d9c..bd540bccdc 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -179,7 +179,7 @@ class ProjectConfig: dbt_project_path: Path | None = None install_dbt_deps: bool = True copy_dbt_packages: bool = settings.default_copy_dbt_packages - manifest_path: Path | ObjectStoragePath | None = None + manifest_path: Path | "ObjectStoragePath" | None = None models_path: Path | None = None seeds_path: Path | None = None snapshots_path: Path | None = None From edf4dc864b32c3075a6c47ae761ec4054c176161 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 1 Oct 2025 15:33:06 +0000 Subject: [PATCH 05/12] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/operators/local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index b3088d877f..4a2ef6f94a 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -294,7 +294,7 @@ def store_compiled_sql(self, tmp_project_dir: str, context: Context) -> None: self.compiled_sql = self.compiled_sql.strip() @staticmethod - def _configure_remote_target_path() -> tuple[Path | "ObjectStoragePath", str] | tuple[None, None]: + def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tuple[None, None]: """Configure the remote target path if it is provided.""" if not remote_target_path: return None, None From e428824a3618b19dbf145eb2cbd9dcf268901056 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Oct 2025 16:33:30 +0100 Subject: [PATCH 06/12] Update cosmos/cache.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cosmos/cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/cache.py b/cosmos/cache.py index 7488c216ff..3ff7a934c2 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -55,7 +55,7 @@ VAR_KEY_CACHE_PREFIX = "cosmos_cache__" -def _configure_remote_cache_dir() -> Path | ObjectStoragePath | None: +def _configure_remote_cache_dir() -> Path | "ObjectStoragePath" | None: """Configure the remote cache dir if it is provided.""" if not settings_remote_cache_dir: return None From 90810a2fcf4eca586ce2ff1cea08d01e686b47ef Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Oct 2025 16:33:40 +0100 Subject: [PATCH 07/12] Update cosmos/dbt/graph.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cosmos/dbt/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 813da01951..ee97ac8a59 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -483,7 +483,7 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None: else: Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True) - def _get_dbt_ls_remote_cache(self, remote_cache_dir: Path | ObjectStoragePath) -> dict[str, str]: + def _get_dbt_ls_remote_cache(self, remote_cache_dir: Path | "ObjectStoragePath") -> dict[str, str]: """Loads the remote cache for dbt ls.""" cache_dict: dict[str, str] = {} remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json" From 8bf185ccd4cac61fac4c68c4afeadd7829126d6e Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Oct 2025 16:33:48 +0100 Subject: [PATCH 08/12] Update cosmos/config.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cosmos/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/config.py b/cosmos/config.py index bd540bccdc..f31e92e5fe 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -256,7 +256,7 @@ def validate_project(self) -> None: If the project path is not provided, we have a scenario 2 """ - mandatory_paths: dict[str, Path | ObjectStoragePath | None] = {} + mandatory_paths: dict[str, Path | "ObjectStoragePath" | None] = {} # We validate the existence of paths added to the `mandatory_paths` map by calling the `exists()` method on each # one. Starting with Cosmos 1.6.0, if the Airflow version is `>= 2.8.0` and a `manifest_path` is provided, we # cast it to an `airflow.io.path.ObjectStoragePath` instance during `ProjectConfig` initialisation, and it From 520652d600a01ac9b64039afa5045d12b6c7af14 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Oct 2025 16:33:59 +0100 Subject: [PATCH 09/12] Update cosmos/cache.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cosmos/cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/cache.py b/cosmos/cache.py index 3ff7a934c2..eacf8ed892 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -60,7 +60,7 @@ def _configure_remote_cache_dir() -> Path | "ObjectStoragePath" | None: if not settings_remote_cache_dir: return None - _configured_cache_dir: Path | ObjectStoragePath | None = None + _configured_cache_dir: Path | "ObjectStoragePath" | None = None cache_dir_str = str(settings_remote_cache_dir) From 6f51fa0aa44019196d799ef0573365fcc5521462 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 1 Oct 2025 15:34:11 +0000 Subject: [PATCH 10/12] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/cache.py | 2 +- cosmos/config.py | 4 ++-- cosmos/dbt/graph.py | 2 +- cosmos/operators/local.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cosmos/cache.py b/cosmos/cache.py index eacf8ed892..827dd5c42b 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -55,7 +55,7 @@ VAR_KEY_CACHE_PREFIX = "cosmos_cache__" -def _configure_remote_cache_dir() -> Path | "ObjectStoragePath" | None: +def _configure_remote_cache_dir() -> Path | ObjectStoragePath | None: """Configure the remote cache dir if it is provided.""" if not settings_remote_cache_dir: return None diff --git a/cosmos/config.py b/cosmos/config.py index f31e92e5fe..5f4d540d9c 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -179,7 +179,7 @@ class ProjectConfig: dbt_project_path: Path | None = None install_dbt_deps: bool = True copy_dbt_packages: bool = settings.default_copy_dbt_packages - manifest_path: Path | "ObjectStoragePath" | None = None + manifest_path: Path | ObjectStoragePath | None = None models_path: Path | None = None seeds_path: Path | None = None snapshots_path: Path | None = None @@ -256,7 +256,7 @@ def validate_project(self) -> None: If the project path is not provided, we have a scenario 2 """ - mandatory_paths: dict[str, Path | "ObjectStoragePath" | None] = {} + mandatory_paths: dict[str, Path | ObjectStoragePath | None] = {} # We validate the existence of paths added to the `mandatory_paths` map by calling the `exists()` method on each # one. Starting with Cosmos 1.6.0, if the Airflow version is `>= 2.8.0` and a `manifest_path` is provided, we # cast it to an `airflow.io.path.ObjectStoragePath` instance during `ProjectConfig` initialisation, and it diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index ee97ac8a59..813da01951 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -483,7 +483,7 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None: else: Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True) - def _get_dbt_ls_remote_cache(self, remote_cache_dir: Path | "ObjectStoragePath") -> dict[str, str]: + def _get_dbt_ls_remote_cache(self, remote_cache_dir: Path | ObjectStoragePath) -> dict[str, str]: """Loads the remote cache for dbt ls.""" cache_dict: dict[str, str] = {} remote_cache_key_path = remote_cache_dir / self.dbt_ls_cache_key / "dbt_ls_cache.json" diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 4a2ef6f94a..a8a41a6edc 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -330,7 +330,7 @@ def _configure_remote_target_path() -> tuple[Path | ObjectStoragePath, str] | tu return _configured_target_path, remote_conn_id def _construct_dest_file_path( - self, dest_target_dir: Path | "ObjectStoragePath", file_path: str, source_compiled_dir: Path, resource_type: str + self, dest_target_dir: Path | ObjectStoragePath, file_path: str, source_compiled_dir: Path, resource_type: str ) -> str: """ Construct the destination path for the compiled SQL files to be uploaded to the remote store. From 412bb859a9c20603d3024ba4a6e1b2f742e26e28 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Oct 2025 16:37:54 +0100 Subject: [PATCH 11/12] Make pre-commit happy --- cosmos/cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/cache.py b/cosmos/cache.py index 827dd5c42b..7488c216ff 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -60,7 +60,7 @@ def _configure_remote_cache_dir() -> Path | ObjectStoragePath | None: if not settings_remote_cache_dir: return None - _configured_cache_dir: Path | "ObjectStoragePath" | None = None + _configured_cache_dir: Path | ObjectStoragePath | None = None cache_dir_str = str(settings_remote_cache_dir) From 67749033af3796643408f7ee8feb29dbb7eba1e1 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 Oct 2025 18:41:27 +0100 Subject: [PATCH 12/12] Update cosmos/cache.py --- cosmos/cache.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cosmos/cache.py b/cosmos/cache.py index 7488c216ff..6278b8cc82 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -26,7 +26,12 @@ if TYPE_CHECKING: try: - from airflow.io.path import ObjectStoragePath + from airflow.sdk import ObjectStoragePath + except ImportError: + try: + from airflow.io.path import ObjectStoragePath + except ImportError: + pass except ImportError: pass from cosmos.constants import (