Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ jobs:
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration
env:
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_YAML_SELECTORS: 0
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
Expand Down Expand Up @@ -521,6 +522,7 @@ jobs:
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-dbtf
env:
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_YAML_SELECTORS: 0
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
Expand Down
166 changes: 148 additions & 18 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import msgpack
import yaml
Expand Down Expand Up @@ -287,6 +287,42 @@ def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: P
shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath))


def _calculate_yaml_selectors_cache_current_version(
cache_identifier: str,
project_dir: Path,
selector_definitions: dict[str, dict[str, Any]],
cache_key: list[str],
) -> str:
"""
Taking into account the project directory contents and the selectors definitions, calculate the
hash that represents the "dbt selectors" version - to be used to decide if the cache should be refreshed or not.

:param cache_identifier: str - Unique identifier of the cache (may include DbtDag or DbtTaskGroup information)
:param project_dir: Path - Path to the target dbt project directory
:param selector_definitions: dict[str, dict[str, Any]] - Dictionary containing the selectors definitions from the manifest
:param cache_key: list[str] - List of strings used as part of the cache key hash calculation
:return: str - Combined hash string of project, selectors, and cache_key (comma-separated)
"""

start_time = time.perf_counter()

# Combined value for when the dbt project directory files were last modified
# This is fast (e.g. 0.01s for jaffle shop, 0.135s for a 5k models dbt folder)
dbt_project_hash = _create_folder_version_hash(project_dir)

# Use JSON with sorted keys for deterministic hashing, resilient to dict ordering changes
yaml_selector_hash = hashlib.md5(
json.dumps(selector_definitions, sort_keys=True, separators=(",", ":")).encode()
).hexdigest()
cache_key_hash = hashlib.md5("".join(sorted(cache_key)).encode()).hexdigest()

elapsed_time = time.perf_counter() - start_time
logger.info(
f"Cosmos performance: time to calculate cache identifier {cache_identifier} for current version: {elapsed_time}"
)
return f"{dbt_project_hash},{yaml_selector_hash},{cache_key_hash}"


def _calculate_dbt_ls_cache_current_version(cache_identifier: str, project_dir: Path, cmd_args: list[str]) -> str:
"""
Taking into account the project directory contents and the command arguments, calculate the
Expand Down Expand Up @@ -321,17 +357,34 @@ def was_project_modified(previous_version: str, current_version: str) -> bool:
return previous_version != current_version


@functools.lru_cache
def were_yaml_selectors_modified(previous_version: str, current_version: str) -> bool:
"""
Given the cache version of a project's selectors.yaml and the latest version
of the project's selectors.yaml, decides if the selectors.yaml was modified or not.
"""
return previous_version != current_version


@provide_session
def delete_unused_dbt_ls_cache(
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
def delete_unused_dbt_cache(
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None, cache_type: str = "cosmos"
) -> int:
"""
Delete Cosmos cache stored in Airflow Variables based on the last execution of their associated DAGs.

This function handles all types of Cosmos cache (dbt ls, YAML selectors, etc.) and is used by
specific wrapper functions for backwards compatibility.

:param max_age_last_usage: Delete cache for DAGs not run within this timeframe
:param session: SQLAlchemy session for Airflow metadata database (automatically provided by @provide_session)
:param cache_type: Type of cache being deleted, used in log messages (default: "cosmos")
:return: Number of Airflow Variables deleted

Example usage:

There are three Cosmos cache Airflow Variables:
1. ``cache cosmos_cache__basic_cosmos_dag``
1. ``cosmos_cache__basic_cosmos_dag``
2. ``cosmos_cache__basic_cosmos_task_group__orders``
3. ``cosmos_cache__basic_cosmos_task_group__customers``

Expand All @@ -344,21 +397,21 @@ def delete_unused_dbt_ls_cache(
To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 5 days ago:

..code: python
>>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=5))
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_dag
>>> delete_unused_dbt_cache(max_age_last_usage=timedelta(days=5))
INFO - Removing the cosmos cache cosmos_cache__basic_cosmos_dag

To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 minutes ago:

..code: python
>>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(minutes=10))
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_dag
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_task_group__orders
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_task_group__orders
>>> delete_unused_dbt_cache(max_age_last_usage=timedelta(minutes=10))
INFO - Removing the cosmos cache cosmos_cache__basic_cosmos_dag
INFO - Removing the cosmos cache cosmos_cache__basic_cosmos_task_group__orders
INFO - Removing the cosmos cache cosmos_cache__basic_cosmos_task_group__customers

To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 days ago
To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 days ago:

..code: python
>>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=10))
>>> delete_unused_dbt_cache(max_age_last_usage=timedelta(days=10))

In this last example, nothing is deleted.
"""
Expand Down Expand Up @@ -390,7 +443,7 @@ def delete_unused_dbt_ls_cache(
)
if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage):
for var_key in vars_keys:
logger.info(f"Removing the dbt ls cache {var_key}")
logger.info(f"Removing the {cache_type} cache {var_key}")
Variable.delete(var_key)
deleted_cosmos_variables += 1

Expand All @@ -402,11 +455,19 @@ def delete_unused_dbt_ls_cache(

# TODO: Add integration tests once remote cache is supported in the CI pipeline
@provide_session
def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
def delete_unused_dbt_remote_cache_files( # pragma: no cover
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None, cache_type: str = "cosmos"
) -> int:
"""
Delete Cosmos cache stored in remote storage based on the last execution of their associated DAGs.

This function handles all types of Cosmos cache (dbt ls, YAML selectors, etc.) stored in remote
storage and is used by specific wrapper functions for backwards compatibility.

:param max_age_last_usage: Delete cache for DAGs not run within this timeframe
:param session: SQLAlchemy session for Airflow metadata database (automatically provided by @provide_session)
:param cache_type: Type of cache being deleted, used in log messages (default: "cosmos")
:return: Number of remote cache files deleted
"""
if session is None:
return 0
Expand All @@ -417,7 +478,7 @@ def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover
configured_remote_cache_dir = _configure_remote_cache_dir()
if not configured_remote_cache_dir:
logger.info(
"No remote cache directory configured. Skipping the deletion of the dbt ls cache files in remote storage."
f"No remote cache directory configured. Skipping the deletion of the {cache_type} cache files in remote storage."
)
return 0

Expand Down Expand Up @@ -446,18 +507,87 @@ def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover
)
if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage):
for file in files:
logger.info(f"Removing the dbt ls cache remote file {file}")
logger.info(f"Removing the {cache_type} cache remote file {file}")
file.unlink()
deleted_cosmos_remote_cache_files += 1
logger.info(
"Deleted %s/%s dbt ls cache files in remote storage.",
"Deleted %s/%s %s cache files in remote storage.",
deleted_cosmos_remote_cache_files,
total_cosmos_remote_cache_files,
cache_type,
)

return deleted_cosmos_remote_cache_files


@provide_session
def delete_unused_dbt_ls_cache(
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete all Cosmos cache stored in Airflow Variables (dbt ls, YAML selectors, etc.).

This function is maintained for backwards compatibility and API stability. It has identical behavior
to delete_unused_dbt_cache() and deletes all types of Cosmos cache, not just dbt ls cache.

:param max_age_last_usage: Delete cache for DAGs not run within this timeframe
:param session: SQLAlchemy session for Airflow metadata database (automatically provided by @provide_session)
:return: Number of Airflow Variables deleted
"""
return delete_unused_dbt_cache(max_age_last_usage, session, cache_type="dbt ls")


@provide_session
def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete all Cosmos cache stored in remote storage (dbt ls, YAML selectors, etc.).

This function is maintained for backwards compatibility and API stability. It has identical behavior
to delete_unused_dbt_remote_cache_files() and deletes all types of Cosmos cache, not just dbt ls cache.

:param max_age_last_usage: Delete cache for DAGs not run within this timeframe
:param session: SQLAlchemy session for Airflow metadata database (automatically provided by @provide_session)
:return: Number of remote cache files deleted
"""
return delete_unused_dbt_remote_cache_files(max_age_last_usage, session, cache_type="dbt ls")


@provide_session
def delete_unused_dbt_yaml_selectors_cache(
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete all Cosmos cache stored in Airflow Variables (dbt ls, YAML selectors, etc.).

This function is maintained for backwards compatibility and API stability. It has identical behavior
to delete_unused_dbt_cache() and deletes all types of Cosmos cache, not just YAML selectors cache.

:param max_age_last_usage: Delete cache for DAGs not run within this timeframe
:param session: SQLAlchemy session for Airflow metadata database (automatically provided by @provide_session)
:return: Number of Airflow Variables deleted
"""
return delete_unused_dbt_cache(max_age_last_usage, session, cache_type="dbt yaml selectors")


@provide_session
def delete_unused_dbt_yaml_selectors_remote_cache_files( # pragma: no cover
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete all Cosmos cache stored in remote storage (dbt ls, YAML selectors, etc.).

This function is maintained for backwards compatibility and API stability. It has identical behavior
to delete_unused_dbt_remote_cache_files() and deletes all types of Cosmos cache, not just YAML selectors cache.

:param max_age_last_usage: Delete cache for DAGs not run within this timeframe
:param session: SQLAlchemy session for Airflow metadata database (automatically provided by @provide_session)
:return: Number of remote cache files deleted
"""
return delete_unused_dbt_remote_cache_files(max_age_last_usage, session, cache_type="dbt yaml selectors")


def is_profile_cache_enabled() -> bool:
"""Return True if global and profile cache is enable"""
return enable_cache and enable_cache_profile
Expand Down
2 changes: 2 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class RenderConfig:
:param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6).
:param source_pruning: Determines if source nodes without a corresponding downstream task should be removed or not. Default is False
:param airflow_vars_to_purge_dbt_ls_cache: Specify Airflow variables that will affect the LoadMode.DBT_LS cache.
:param airflow_vars_to_purge_dbt_yaml_selectors_cache: Specify Airflow variables that will affect the parsed manifest YamlSelectors cache.
:param normalize_task_id: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name.
:param normalize_task_display_name: A callable that takes a dbt node as input and returns the task display name. This allows users to assign a custom task display name separate from the node ID.
:param should_detach_multiple_parents_tests: A boolean that allows users to decide whether to run tests with multiple parent dependencies in separate tasks.
Expand All @@ -99,6 +100,7 @@ class RenderConfig:
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE
source_pruning: bool = False
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)
airflow_vars_to_purge_dbt_yaml_selectors_cache: list[str] = field(default_factory=list)
normalize_task_id: Callable[..., Any] | None = None
normalize_task_display_name: Callable[..., Any] | None = None
should_detach_multiple_parents_tests: bool = False
Expand Down
Loading