From b2c60c41e91fc45fa6df90da28658d6230942282 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 21 May 2024 14:23:05 +0100 Subject: [PATCH 1/7] Introduce DAG level cache (without expiring) --- cosmos/airflow/dag.py | 33 +++++++++++++++++++++++++++++++++ cosmos/settings.py | 4 +++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/cosmos/airflow/dag.py b/cosmos/airflow/dag.py index de958f118f..78f7f53148 100644 --- a/cosmos/airflow/dag.py +++ b/cosmos/airflow/dag.py @@ -4,11 +4,17 @@ from __future__ import annotations +import pickle +from pathlib import Path from typing import Any from airflow.models.dag import DAG +from cosmos import cache, settings from cosmos.converter import DbtToAirflowConverter, airflow_kwargs, specific_kwargs +from cosmos.log import get_logger + +logger = get_logger() class DbtDag(DAG, DbtToAirflowConverter): @@ -16,11 +22,38 @@ class DbtDag(DAG, DbtToAirflowConverter): Render a dbt project as an Airflow DAG. """ + @staticmethod + def get_cache_filepath(cache_identifier: str) -> Path: + cache_dir_path = cache._obtain_cache_dir_path(cache_identifier) + return cache_dir_path / f"{cache_identifier}.pkl" + + @staticmethod + def should_use_cache() -> bool: + return True + + def __new__(cls, *args, **kwargs): # type: ignore + dag_id = kwargs.get("dag_id") + if dag_id is not None: + cache_filepath = DbtDag.get_cache_filepath(dag_id) + if settings.enable_cache and settings.experimental_cache and cache_filepath.exists(): + logger.info(f"Restoring DbtDag {dag_id} from cache {cache_filepath}") + with open(cache_filepath, "rb") as fp: + return pickle.load(fp) + + instance = DAG.__new__(DAG) + DbtDag.__init__(instance, *args, **kwargs) # type: ignore + return instance + def __init__( self, *args: Any, **kwargs: Any, ) -> None: + logger.info(f"Creating DbtDag {kwargs.get('dag_id')}") DAG.__init__(self, *args, **airflow_kwargs(**kwargs)) kwargs["dag"] = self DbtToAirflowConverter.__init__(self, *args, **specific_kwargs(**kwargs)) + cache_filepath = DbtDag.get_cache_filepath(kwargs["dag_id"]) + if settings.enable_cache and settings.experimental_cache: + with open(cache_filepath, "wb") as fp: + pickle.dump(self, fp) diff --git a/cosmos/settings.py b/cosmos/settings.py index 44a08fd486..4f6dda3ded 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -10,7 +10,9 @@ # In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME) cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR) -enable_cache = conf.get("cosmos", "enable_cache", fallback=True) + +enable_cache = conf.getboolean("cosmos", "enable_cache", fallback=True) +experimental_cache = conf.getboolean("cosmos", "experimental_cache", fallback=False) propagate_logs = conf.getboolean("cosmos", "propagate_logs", fallback=True) dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None) dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None) From 0db71211228d53f5bd949b0b20b79ba06cd9b5cb Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 23 May 2024 01:00:45 +0100 Subject: [PATCH 2/7] Version DbtDag cache --- cosmos/airflow/dag.py | 80 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 72 insertions(+), 8 deletions(-) diff --git a/cosmos/airflow/dag.py b/cosmos/airflow/dag.py index 78f7f53148..dd50920806 100644 --- a/cosmos/airflow/dag.py +++ b/cosmos/airflow/dag.py @@ -4,7 +4,11 @@ from __future__ import annotations +import functools + +# import inspect import pickle +import time from pathlib import Path from typing import Any @@ -23,37 +27,97 @@ class DbtDag(DAG, DbtToAirflowConverter): """ @staticmethod + @functools.lru_cache def get_cache_filepath(cache_identifier: str) -> Path: cache_dir_path = cache._obtain_cache_dir_path(cache_identifier) return cache_dir_path / f"{cache_identifier}.pkl" @staticmethod + @functools.lru_cache + def get_cache_version_filepath(cache_identifier: str) -> Path: + return Path(str(DbtDag.get_cache_filepath(cache_identifier)) + ".version") + + @staticmethod + @functools.lru_cache def should_use_cache() -> bool: - return True + return settings.enable_cache and settings.experimental_cache + + @staticmethod + @functools.lru_cache + def is_project_unmodified(dag_id: str, current_version: str) -> Path | None: + cache_filepath = DbtDag.get_cache_filepath(dag_id) + cache_version_filepath = DbtDag.get_cache_version_filepath(dag_id) + if cache_version_filepath.exists() and cache_filepath.exists(): + previous_cache_version = cache_version_filepath.read_text() + if previous_cache_version == current_version: + return cache_filepath + return None + + @staticmethod + @functools.lru_cache + def calculate_current_version(dag_id: str, project_dir: Path) -> str: + start_time = time.process_time() + + # When DAG file was last changed - this is very slow (e.g. 0.6s) + # caller_dag_frame = inspect.stack()[1] + # caller_dag_filepath = Path(caller_dag_frame.filename) + # logger.info("The %s DAG is located in: %s" % (dag_id, caller_dag_filepath)) + # dag_last_modified = caller_dag_filepath.stat().st_mtime + # mid_time = time.process_time() - start_time + # logger.info(f"It took {mid_time:.3}s to calculate the first part of the version") + dag_last_modified = None + + # Combined value for when the dbt project directory files were last modified + # This is fast (e.g. 0.01s for jaffle shop, 0.135s for a 5k models dbt folder) + dbt_combined_last_modified = sum([path.stat().st_mtime for path in project_dir.glob("**/*")]) + + elapsed_time = time.process_time() - start_time + logger.info(f"It took {elapsed_time:.3}s to calculate the cache version for the DbtDag {dag_id}") + return f"{dag_last_modified} {dbt_combined_last_modified}" def __new__(cls, *args, **kwargs): # type: ignore dag_id = kwargs.get("dag_id") - if dag_id is not None: - cache_filepath = DbtDag.get_cache_filepath(dag_id) - if settings.enable_cache and settings.experimental_cache and cache_filepath.exists(): + project_config = kwargs.get("project_config") + + # When we load a Pickle dump of a DbtDag, __new__ is invoked without kwargs + # In those cases, we should not call DbtDag.__new__ again, otherwise we'll have an infinite recursion + if dag_id is not None and project_config and project_config.dbt_project_path: + current_version = DbtDag.calculate_current_version(dag_id, project_config.dbt_project_path) + cache_filepath = DbtDag.should_use_cache() and DbtDag.is_project_unmodified(dag_id, current_version) + if cache_filepath: logger.info(f"Restoring DbtDag {dag_id} from cache {cache_filepath}") with open(cache_filepath, "rb") as fp: - return pickle.load(fp) + start_time = time.process_time() + dbt_dag = pickle.load(fp) + elapsed_time = time.process_time() - start_time + logger.info(f"It took {elapsed_time:.3}s to restore the cached version of the DbtDag {dag_id}") + return dbt_dag instance = DAG.__new__(DAG) DbtDag.__init__(instance, *args, **kwargs) # type: ignore return instance + # The __init__ is not called when restoring the cached DbtDag in __new__ def __init__( self, *args: Any, **kwargs: Any, ) -> None: - logger.info(f"Creating DbtDag {kwargs.get('dag_id')}") + start_time = time.process_time() + dag_id = kwargs["dag_id"] + project_config = kwargs.get("project_config") + DAG.__init__(self, *args, **airflow_kwargs(**kwargs)) kwargs["dag"] = self DbtToAirflowConverter.__init__(self, *args, **specific_kwargs(**kwargs)) - cache_filepath = DbtDag.get_cache_filepath(kwargs["dag_id"]) - if settings.enable_cache and settings.experimental_cache: + elapsed_time = time.process_time() - start_time + logger.info(f"It took {elapsed_time} to create the DbtDag {dag_id} from scratch") + + if DbtDag.should_use_cache() and project_config: + cache_filepath = DbtDag.get_cache_filepath(dag_id) with open(cache_filepath, "wb") as fp: pickle.dump(self, fp) + cache_version_filepath = DbtDag.get_cache_version_filepath(dag_id) + current_version = DbtDag.calculate_current_version(dag_id, project_config.dbt_project_path) + cache_version_filepath.write_text(current_version) + logger.info(f"Stored DbtDag {dag_id} cache {cache_filepath}") From daad0b33ee437e1017e25cdcae03f1e64d2e6fdd Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 23 May 2024 02:00:11 +0100 Subject: [PATCH 3/7] Extend DbtDag caching to DbtTaskGroup --- cosmos/airflow/dag.py | 87 +++++++++--------------------------- cosmos/airflow/task_group.py | 49 ++++++++++++++++++++ cosmos/cache.py | 83 ++++++++++++++++++++++++++++++++++ 3 files changed, 152 insertions(+), 67 deletions(-) diff --git a/cosmos/airflow/dag.py b/cosmos/airflow/dag.py index dd50920806..bdcce37028 100644 --- a/cosmos/airflow/dag.py +++ b/cosmos/airflow/dag.py @@ -4,17 +4,14 @@ from __future__ import annotations -import functools - # import inspect import pickle import time -from pathlib import Path from typing import Any from airflow.models.dag import DAG -from cosmos import cache, settings +from cosmos import cache from cosmos.converter import DbtToAirflowConverter, airflow_kwargs, specific_kwargs from cosmos.log import get_logger @@ -26,78 +23,32 @@ class DbtDag(DAG, DbtToAirflowConverter): Render a dbt project as an Airflow DAG. """ - @staticmethod - @functools.lru_cache - def get_cache_filepath(cache_identifier: str) -> Path: - cache_dir_path = cache._obtain_cache_dir_path(cache_identifier) - return cache_dir_path / f"{cache_identifier}.pkl" - - @staticmethod - @functools.lru_cache - def get_cache_version_filepath(cache_identifier: str) -> Path: - return Path(str(DbtDag.get_cache_filepath(cache_identifier)) + ".version") - - @staticmethod - @functools.lru_cache - def should_use_cache() -> bool: - return settings.enable_cache and settings.experimental_cache - - @staticmethod - @functools.lru_cache - def is_project_unmodified(dag_id: str, current_version: str) -> Path | None: - cache_filepath = DbtDag.get_cache_filepath(dag_id) - cache_version_filepath = DbtDag.get_cache_version_filepath(dag_id) - if cache_version_filepath.exists() and cache_filepath.exists(): - previous_cache_version = cache_version_filepath.read_text() - if previous_cache_version == current_version: - return cache_filepath - return None - - @staticmethod - @functools.lru_cache - def calculate_current_version(dag_id: str, project_dir: Path) -> str: - start_time = time.process_time() - - # When DAG file was last changed - this is very slow (e.g. 0.6s) - # caller_dag_frame = inspect.stack()[1] - # caller_dag_filepath = Path(caller_dag_frame.filename) - # logger.info("The %s DAG is located in: %s" % (dag_id, caller_dag_filepath)) - # dag_last_modified = caller_dag_filepath.stat().st_mtime - # mid_time = time.process_time() - start_time - # logger.info(f"It took {mid_time:.3}s to calculate the first part of the version") - dag_last_modified = None - - # Combined value for when the dbt project directory files were last modified - # This is fast (e.g. 0.01s for jaffle shop, 0.135s for a 5k models dbt folder) - dbt_combined_last_modified = sum([path.stat().st_mtime for path in project_dir.glob("**/*")]) - - elapsed_time = time.process_time() - start_time - logger.info(f"It took {elapsed_time:.3}s to calculate the cache version for the DbtDag {dag_id}") - return f"{dag_last_modified} {dbt_combined_last_modified}" - def __new__(cls, *args, **kwargs): # type: ignore dag_id = kwargs.get("dag_id") project_config = kwargs.get("project_config") - # When we load a Pickle dump of a DbtDag, __new__ is invoked without kwargs - # In those cases, we should not call DbtDag.__new__ again, otherwise we'll have an infinite recursion + # When we load a Pickle dump of an instance, __new__ is invoked without kwargs + # In those cases, we should not call __new__ again, otherwise we'll have an infinite recursion if dag_id is not None and project_config and project_config.dbt_project_path: - current_version = DbtDag.calculate_current_version(dag_id, project_config.dbt_project_path) - cache_filepath = DbtDag.should_use_cache() and DbtDag.is_project_unmodified(dag_id, current_version) + cache_id = cache.create_cache_identifier_v2(dag_id, None) + current_version = cache.calculate_current_version(cache_id, project_config.dbt_project_path) + cache_filepath = cache.should_use_cache() and cache.is_project_unmodified(cache_id, current_version) if cache_filepath: - logger.info(f"Restoring DbtDag {dag_id} from cache {cache_filepath}") + logger.info(f"Restoring {cls.__name__} {dag_id} from cache {cache_filepath}") with open(cache_filepath, "rb") as fp: start_time = time.process_time() dbt_dag = pickle.load(fp) elapsed_time = time.process_time() - start_time - logger.info(f"It took {elapsed_time:.3}s to restore the cached version of the DbtDag {dag_id}") + logger.info( + f"It took {elapsed_time:.3}s to restore the cached version of the {cls.__name__} {dag_id}" + ) return dbt_dag instance = DAG.__new__(DAG) - DbtDag.__init__(instance, *args, **kwargs) # type: ignore + cls.__init__(instance, *args, **kwargs) # type: ignore return instance - # The __init__ is not called when restoring the cached DbtDag in __new__ + # The __init__ is not called when restoring the cached in __new__ def __init__( self, *args: Any, @@ -110,14 +61,16 @@ def __init__( DAG.__init__(self, *args, **airflow_kwargs(**kwargs)) kwargs["dag"] = self DbtToAirflowConverter.__init__(self, *args, **specific_kwargs(**kwargs)) + elapsed_time = time.process_time() - start_time - logger.info(f"It took {elapsed_time} to create the DbtDag {dag_id} from scratch") + logger.info(f"It took {elapsed_time} to create the {self.__class__.__name__} {dag_id} from scratch") - if DbtDag.should_use_cache() and project_config: - cache_filepath = DbtDag.get_cache_filepath(dag_id) + if cache.should_use_cache() and project_config: + cache_id = cache.create_cache_identifier_v2(dag_id, None) + cache_filepath = cache.get_cache_filepath(cache_id) with open(cache_filepath, "wb") as fp: pickle.dump(self, fp) - cache_version_filepath = DbtDag.get_cache_version_filepath(dag_id) - current_version = DbtDag.calculate_current_version(dag_id, project_config.dbt_project_path) + cache_version_filepath = cache.get_cache_version_filepath(cache_id) + current_version = cache.calculate_current_version(cache_id, project_config.dbt_project_path) cache_version_filepath.write_text(current_version) - logger.info(f"Stored DbtDag {dag_id} cache {cache_filepath}") + logger.info(f"Stored {self.__class__.__name__} {dag_id} cache {cache_filepath}") diff --git a/cosmos/airflow/task_group.py b/cosmos/airflow/task_group.py index 64fcb298aa..90881726ed 100644 --- a/cosmos/airflow/task_group.py +++ b/cosmos/airflow/task_group.py @@ -4,11 +4,17 @@ from __future__ import annotations +import pickle +import time from typing import Any from airflow.utils.task_group import TaskGroup +from cosmos import cache from cosmos.converter import DbtToAirflowConverter, airflow_kwargs, specific_kwargs +from cosmos.log import get_logger + +logger = get_logger() class DbtTaskGroup(TaskGroup, DbtToAirflowConverter): @@ -16,13 +22,56 @@ class DbtTaskGroup(TaskGroup, DbtToAirflowConverter): Render a dbt project as an Airflow Task Group. """ + def __new__(cls, *args, **kwargs): # type: ignore + dag_id = kwargs.get("dag_id") + task_id = kwargs.get("task_id") + project_config = kwargs.get("project_config") + + # When we load a Pickle dump of an instance, __new__ is invoked without kwargs + # In those cases, we should not call __new__ again, otherwise we'll have an infinite recursion + if task_id is not None and project_config and project_config.dbt_project_path: + cache_id = cache.create_cache_identifier_v2(dag_id, task_id) + current_version = cache.calculate_current_version(cache_id, project_config.dbt_project_path) + cache_filepath = cache.should_use_cache() and cache.is_project_unmodified(cache_id, current_version) + if cache_filepath: + logger.info(f"Restoring {cls.__name__} {dag_id} from cache {cache_filepath}") + with open(cache_filepath, "rb") as fp: + start_time = time.process_time() + dbt_dag = pickle.load(fp) + elapsed_time = time.process_time() - start_time + logger.info( + f"It took {elapsed_time:.3}s to restore the cached version of the {cls.__name__} {dag_id}" + ) + return dbt_dag + + instance = TaskGroup.__new__(TaskGroup) + cls.__init__(instance, *args, **kwargs) # type: ignore + return instance + def __init__( self, group_id: str = "dbt_task_group", *args: Any, **kwargs: Any, ) -> None: + start_time = time.process_time() kwargs["group_id"] = group_id + dag_id = kwargs.get("dag_id") + project_config = kwargs.get("project_config") + TaskGroup.__init__(self, *args, **airflow_kwargs(**kwargs)) kwargs["task_group"] = self DbtToAirflowConverter.__init__(self, *args, **specific_kwargs(**kwargs)) + + elapsed_time = time.process_time() - start_time + logger.info(f"It took {elapsed_time} to create the {self.__class__.__name__} {dag_id} from scratch") + + if cache.should_use_cache() and project_config: + cache_id = cache.create_cache_identifier_v2(dag_id, group_id) + cache_filepath = cache.get_cache_filepath(cache_id) + with open(cache_filepath, "wb") as fp: + pickle.dump(self, fp) + cache_version_filepath = cache.get_cache_version_filepath(cache_id) + current_version = cache.calculate_current_version(cache_id, project_config.dbt_project_path) + cache_version_filepath.write_text(current_version) + logger.info(f"Stored {self.__class__.__name__} {dag_id} cache {cache_filepath}") diff --git a/cosmos/cache.py b/cosmos/cache.py index 1e0b341f07..37a88d1799 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -1,6 +1,8 @@ from __future__ import annotations +import functools import shutil +import time from pathlib import Path import msgpack @@ -171,3 +173,84 @@ def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: P if source_manifest_filepath.exists(): shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath)) + + +# The following methods are being used to cache DbtDag / DbtTaskGroup + + +# It was considered to create a cache identifier based on the dbt project path, as opposed +# to where it is used in Airflow. However, we could have concurrency issues if the same +# dbt cached directory was being used by different dbt task groups or DAGs within the same +# node. For this reason, as a starting point, the cache is identified by where it is used. +# This can be reviewed in the future. +def create_cache_identifier_v2(dag_id: str | None, task_group_id: str | None) -> str: + # FIXME: To be refactored and merged with _create_cache_identifier + # Missing support to: task_group.group_id + """ + Given a DAG name and a (optional) task_group_name, create the identifier for caching. + + :param dag_name: Name of the Cosmos DbtDag being cached + :param task_group_name: (optional) Name of the Cosmos DbtTaskGroup being cached + :return: Unique identifier representing the cache + """ + cache_identifiers_list = [] + if task_group_id: + if dag_id is not None: + cache_identifiers_list.append(dag_id) + if task_group_id is not None: + cache_identifiers_list.append(task_group_id) + cache_identifier = "__".join(cache_identifiers_list) + else: + cache_identifier = str(dag_id) + + return cache_identifier + + +@functools.lru_cache +def get_cache_filepath(cache_identifier: str) -> Path: + cache_dir_path = _obtain_cache_dir_path(cache_identifier) + return cache_dir_path / f"{cache_identifier}.pkl" + + +@functools.lru_cache +def get_cache_version_filepath(cache_identifier: str) -> Path: + return Path(str(get_cache_filepath(cache_identifier)) + ".version") + + +@functools.lru_cache +def should_use_cache() -> bool: + return settings.enable_cache and settings.experimental_cache + + +@functools.lru_cache +def calculate_current_version(dag_id: str, project_dir: Path) -> str: + start_time = time.process_time() + + # When DAG file was last changed - this is very slow (e.g. 0.6s) + # caller_dag_frame = inspect.stack()[1] + # caller_dag_filepath = Path(caller_dag_frame.filename) + # logger.info("The %s DAG is located in: %s" % (dag_id, caller_dag_filepath)) + # dag_last_modified = caller_dag_filepath.stat().st_mtime + # mid_time = time.process_time() - start_time + # logger.info(f"It took {mid_time:.3}s to calculate the first part of the version") + # dag_last_modified = None + + # Combined value for when the dbt project directory files were last modified + # This is fast (e.g. 0.01s for jaffle shop, 0.135s for a 5k models dbt folder) + dbt_combined_last_modified = sum([path.stat().st_mtime for path in project_dir.glob("**/*")]) + + elapsed_time = time.process_time() - start_time + logger.info(f"It took {elapsed_time:.3}s to calculate the cache version for the {dag_id}") + # return f"{dag_last_modified} {dbt_combined_last_modified}" + return f"{dbt_combined_last_modified}" + + +@functools.lru_cache +def is_project_unmodified(dag_id: str, current_version: str) -> Path | None: + cache_filepath = get_cache_filepath(dag_id) + cache_version_filepath = get_cache_version_filepath(dag_id) + if cache_version_filepath.exists() and cache_filepath.exists(): + previous_cache_version = cache_version_filepath.read_text() + if previous_cache_version == current_version: + return cache_filepath + return None From 76ab3feecb5870100d3cc653e55dfd2fd332009e Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 23 May 2024 02:18:39 +0100 Subject: [PATCH 4/7] Release Cosmos 1.5.0a1 --- CHANGELOG.rst | 21 +++++++++++++++++++-- cosmos/__init__.py | 2 +- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2ef66782c1..e74315ba12 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,9 +1,26 @@ Changelog ========= -1.4.1 (2024-05-17) +1.5.0a1 (2024-05-23) -------------------- +New Features + +* Support for running dbt tasks in AWS EKS in #944 by @VolkerSchiewe +* Support caching at a DbtDag and DbtTaskGroupLevel in #992 by @tatiana (WIP) + +Others + +* Drop support for Airflow 2.3 in #994 by @pankajkoti +* Update Astro Runtime image in #988 and #989 by @RNHTTR +* Enable ruff F linting in #985 by @pankajastro +* Move Cosmos Airflow configuration to settings.py in #975 by @pankajastro + + + +1.4.1 (2024-05-17) +------------------ + Bug fixes * Fix manifest testing behavior in #955 by @chris-okorodudu @@ -20,7 +37,7 @@ Others 1.4.0 (2024-05-13) --------------------- +------------------ Features diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 5d88d35d3c..2fc463d5c7 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.4.1" +__version__ = "1.5.0a1" from cosmos.airflow.dag import DbtDag From ad7762495b8cf808b0be5baeb9e3b2d7458a5b09 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 23 May 2024 17:12:19 +0100 Subject: [PATCH 5/7] Change cache version to contain DAG timestamp --- cosmos/cache.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/cosmos/cache.py b/cosmos/cache.py index 37a88d1799..96622659ce 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -1,6 +1,7 @@ from __future__ import annotations import functools +import inspect import shutil import time from pathlib import Path @@ -227,12 +228,12 @@ def calculate_current_version(dag_id: str, project_dir: Path) -> str: start_time = time.process_time() # When DAG file was last changed - this is very slow (e.g. 0.6s) - # caller_dag_frame = inspect.stack()[1] - # caller_dag_filepath = Path(caller_dag_frame.filename) - # logger.info("The %s DAG is located in: %s" % (dag_id, caller_dag_filepath)) - # dag_last_modified = caller_dag_filepath.stat().st_mtime - # mid_time = time.process_time() - start_time - # logger.info(f"It took {mid_time:.3}s to calculate the first part of the version") + caller_dag_frame = inspect.stack()[1] + caller_dag_filepath = Path(caller_dag_frame.filename) + logger.info(f"The {dag_id} DAG is located in: {caller_dag_filepath}") + dag_last_modified = caller_dag_filepath.stat().st_mtime + mid_time = time.process_time() - start_time + logger.info(f"It took {mid_time:.3}s to calculate the first part of the version") # dag_last_modified = None # Combined value for when the dbt project directory files were last modified @@ -241,8 +242,8 @@ def calculate_current_version(dag_id: str, project_dir: Path) -> str: elapsed_time = time.process_time() - start_time logger.info(f"It took {elapsed_time:.3}s to calculate the cache version for the {dag_id}") - # return f"{dag_last_modified} {dbt_combined_last_modified}" - return f"{dbt_combined_last_modified}" + return f"{dag_last_modified} {dbt_combined_last_modified}" + # return f"{dbt_combined_last_modified}" @functools.lru_cache From 4f8bcbc346543f84d2eae7c6fcd3a01636b7f815 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 23 May 2024 17:17:49 +0100 Subject: [PATCH 6/7] Release Cosmos 1.5.0a2 --- CHANGELOG.rst | 3 ++- cosmos/__init__.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e74315ba12..9b3d669974 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,13 +1,14 @@ Changelog ========= -1.5.0a1 (2024-05-23) +1.5.0a2 (2024-05-23) -------------------- New Features * Support for running dbt tasks in AWS EKS in #944 by @VolkerSchiewe * Support caching at a DbtDag and DbtTaskGroupLevel in #992 by @tatiana (WIP) + - difference from 1.5.0a1: Include timestamp of the DAG in the cache version Others diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 2fc463d5c7..1372b998b5 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.5.0a1" +__version__ = "1.5.0a2" from cosmos.airflow.dag import DbtDag From 83480fd0230902412eb718a046fe5bf282f07968 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 23 May 2024 17:21:37 +0100 Subject: [PATCH 7/7] Fix the reference to the DAG file --- cosmos/cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/cache.py b/cosmos/cache.py index 96622659ce..1154399703 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -228,7 +228,7 @@ def calculate_current_version(dag_id: str, project_dir: Path) -> str: start_time = time.process_time() # When DAG file was last changed - this is very slow (e.g. 0.6s) - caller_dag_frame = inspect.stack()[1] + caller_dag_frame = inspect.stack()[2] caller_dag_filepath = Path(caller_dag_frame.filename) logger.info(f"The {dag_id} DAG is located in: {caller_dag_filepath}") dag_last_modified = caller_dag_filepath.stat().st_mtime