Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b1603c2
Port implementation without tests or optimizations
YourRoyalLinus Jan 6, 2026
f1c0992
Implement caching for parsed yaml selectors
YourRoyalLinus Jan 6, 2026
2bd3804
Fix static typing errors
YourRoyalLinus Jan 7, 2026
bc08169
Fix code complexity linting errors
YourRoyalLinus Jan 7, 2026
728a30a
Update comments
YourRoyalLinus Jan 8, 2026
795f964
Converge on single graph cache
YourRoyalLinus Jan 8, 2026
f609ada
Use proper function to check for selectors yaml cache invalidation
YourRoyalLinus Jan 8, 2026
f88b344
Update comments and docstrings
YourRoyalLinus Jan 8, 2026
1b42774
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
YourRoyalLinus Jan 8, 2026
389d5e1
Update comments and docstrings
YourRoyalLinus Jan 9, 2026
b889bd5
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
YourRoyalLinus Jan 9, 2026
195eb0c
Refactor and reorganize yaml selectors logic
YourRoyalLinus Jan 15, 2026
93078dd
Pull preprocessed selector definitions from the manifest
YourRoyalLinus Jan 15, 2026
2b410a4
Remove dbt_spec_version
YourRoyalLinus Jan 15, 2026
52c6982
Invalidate cosmos cache when YamlSelectors implementation changes
YourRoyalLinus Jan 16, 2026
02679bf
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
YourRoyalLinus Jan 16, 2026
6c1402e
Remove changes from previous approach
YourRoyalLinus Jan 21, 2026
ec12d86
Implement unit/integration tests
YourRoyalLinus Jan 23, 2026
f2a398d
Return hash of source code for impl_version
YourRoyalLinus Jan 23, 2026
c87043b
Unify dbt_cache implementation (preserving public API)
YourRoyalLinus Jan 23, 2026
9b16f05
Update docs
YourRoyalLinus Jan 23, 2026
2e05fca
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
YourRoyalLinus Jan 23, 2026
9c2c718
Use cached_property for impl_version
YourRoyalLinus Jan 26, 2026
7de468f
Shorten manifest selectors example dag name
YourRoyalLinus Jan 26, 2026
5ed61dd
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
YourRoyalLinus Jan 26, 2026
63f55b0
Fix failing unit tests on build agent
YourRoyalLinus Jan 26, 2026
f5d6b8e
Fix typo in caching docs + Add entry for selector caching to cosmos-c…
YourRoyalLinus Jan 26, 2026
cad91ac
Apply copilot PR suggestions
YourRoyalLinus Jan 27, 2026
2f040d6
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
YourRoyalLinus Jan 27, 2026
2403b42
Ensure consistent config naming + Update docs
YourRoyalLinus Jan 27, 2026
9cdf03e
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
tatiana Jan 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ jobs:
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration
env:
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_YAML_SELECTORS: 0
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres
AIRFLOW_CONN_AWS_S3_CONN: ${{ secrets.AIRFLOW_CONN_AWS_S3_CONN }}
Expand Down Expand Up @@ -521,6 +522,7 @@ jobs:
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ matrix.dbt-version }}:test-integration-dbtf
env:
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_YAML_SELECTORS: 0
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
Expand Down
166 changes: 148 additions & 18 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import msgpack
import yaml
Expand Down Expand Up @@ -287,6 +287,42 @@ def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: P
shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath))


def _calculate_yaml_selectors_cache_current_version(
cache_identifier: str,
project_dir: Path,
selector_definitions: dict[str, dict[str, Any]],
cache_key: list[str],
) -> str:
"""
Taking into account the project directory contents and the selectors definitions, calculate the
hash that represents the "dbt selectors" version - to be used to decide if the cache should be refreshed or not.

:param cache_identifier: str - Unique identifier of the cache (may include DbtDag or DbtTaskGroup information)
:param project_dir: Path - Path to the target dbt project directory
:param selector_definitions: dict[str, dict[str, Any]] - Dictionary containing the selectors definitions from the manifest
:param cache_key: list[str] - List of strings used as part of the cache key hash calculation
:return: str - Combined hash string of project, selectors, and cache_key (comma-separated)
"""

start_time = time.perf_counter()

# Combined value for when the dbt project directory files were last modified
# This is fast (e.g. 0.01s for jaffle shop, 0.135s for a 5k models dbt folder)
dbt_project_hash = _create_folder_version_hash(project_dir)

# Use JSON with sorted keys for deterministic hashing, resilient to dict ordering changes
yaml_selector_hash = hashlib.md5(
json.dumps(selector_definitions, sort_keys=True, separators=(",", ":")).encode()
).hexdigest()
cache_key_hash = hashlib.md5("".join(sorted(cache_key)).encode()).hexdigest()

elapsed_time = time.perf_counter() - start_time
logger.info(
f"Cosmos performance: time to calculate cache identifier {cache_identifier} for current version: {elapsed_time}"
)
return f"{dbt_project_hash},{yaml_selector_hash},{cache_key_hash}"


def _calculate_dbt_ls_cache_current_version(cache_identifier: str, project_dir: Path, cmd_args: list[str]) -> str:
"""
Taking into account the project directory contents and the command arguments, calculate the
Expand Down Expand Up @@ -321,17 +357,34 @@ def was_project_modified(previous_version: str, current_version: str) -> bool:
return previous_version != current_version


@functools.lru_cache
def were_yaml_selectors_modified(previous_version: str, current_version: str) -> bool:
"""
Given the cache version of a project's selectors.yaml and the latest version
of the project's selectors.yaml, decides if the selectors.yaml was modified or not.
"""
return previous_version != current_version


@provide_session
def delete_unused_dbt_ls_cache(
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
def delete_unused_dbt_cache(
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None, cache_type: str = "cosmos"
) -> int:
"""
Delete Cosmos cache stored in Airflow Variables based on the last execution of their associated DAGs.

This function handles all types of Cosmos cache (dbt ls, YAML selectors, etc.) and is used by
specific wrapper functions for backwards compatibility.

:param max_age_last_usage: Delete cache for DAGs not run within this timeframe
:param session: SQLAlchemy session for Airflow metadata database (automatically provided by @provide_session)
:param cache_type: Type of cache being deleted, used in log messages (default: "cosmos")
:return: Number of Airflow Variables deleted

Example usage:

There are three Cosmos cache Airflow Variables:
1. ``cache cosmos_cache__basic_cosmos_dag``
1. ``cosmos_cache__basic_cosmos_dag``
2. ``cosmos_cache__basic_cosmos_task_group__orders``
3. ``cosmos_cache__basic_cosmos_task_group__customers``

Expand All @@ -344,21 +397,21 @@ def delete_unused_dbt_ls_cache(
To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 5 days ago:

..code: python
>>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=5))
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_dag
>>> delete_unused_dbt_cache(max_age_last_usage=timedelta(days=5))
INFO - Removing the cosmos cache cosmos_cache__basic_cosmos_dag

To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 minutes ago:

..code: python
>>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(minutes=10))
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_dag
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_task_group__orders
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_task_group__orders
>>> delete_unused_dbt_cache(max_age_last_usage=timedelta(minutes=10))
INFO - Removing the cosmos cache cosmos_cache__basic_cosmos_dag
INFO - Removing the cosmos cache cosmos_cache__basic_cosmos_task_group__orders
INFO - Removing the cosmos cache cosmos_cache__basic_cosmos_task_group__customers

To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 days ago
To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 days ago:

..code: python
>>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=10))
>>> delete_unused_dbt_cache(max_age_last_usage=timedelta(days=10))

In this last example, nothing is deleted.
"""
Expand Down Expand Up @@ -390,7 +443,7 @@ def delete_unused_dbt_ls_cache(
)
if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage):
for var_key in vars_keys:
logger.info(f"Removing the dbt ls cache {var_key}")
logger.info(f"Removing the {cache_type} cache {var_key}")
Variable.delete(var_key)
deleted_cosmos_variables += 1

Expand All @@ -402,11 +455,19 @@ def delete_unused_dbt_ls_cache(

# TODO: Add integration tests once remote cache is supported in the CI pipeline
@provide_session
def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
def delete_unused_dbt_remote_cache_files( # pragma: no cover
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None, cache_type: str = "cosmos"
) -> int:
"""
Delete Cosmos cache stored in remote storage based on the last execution of their associated DAGs.

This function handles all types of Cosmos cache (dbt ls, YAML selectors, etc.) stored in remote
storage and is used by specific wrapper functions for backwards compatibility.

:param max_age_last_usage: Delete cache for DAGs not run within this timeframe
:param session: SQLAlchemy session for Airflow metadata database (automatically provided by @provide_session)
:param cache_type: Type of cache being deleted, used in log messages (default: "cosmos")
:return: Number of remote cache files deleted
"""
if session is None:
return 0
Expand All @@ -417,7 +478,7 @@ def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover
configured_remote_cache_dir = _configure_remote_cache_dir()
if not configured_remote_cache_dir:
logger.info(
"No remote cache directory configured. Skipping the deletion of the dbt ls cache files in remote storage."
f"No remote cache directory configured. Skipping the deletion of the {cache_type} cache files in remote storage."
)
return 0

Expand Down Expand Up @@ -446,18 +507,87 @@ def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover
)
if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage):
for file in files:
logger.info(f"Removing the dbt ls cache remote file {file}")
logger.info(f"Removing the {cache_type} cache remote file {file}")
file.unlink()
deleted_cosmos_remote_cache_files += 1
logger.info(
"Deleted %s/%s dbt ls cache files in remote storage.",
"Deleted %s/%s %s cache files in remote storage.",
deleted_cosmos_remote_cache_files,
total_cosmos_remote_cache_files,
cache_type,
)

return deleted_cosmos_remote_cache_files


@provide_session
def delete_unused_dbt_ls_cache(
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete all Cosmos cache stored in Airflow Variables (dbt ls, YAML selectors, etc.).

This function is maintained for backwards compatibility and API stability. It has identical behavior
to delete_unused_dbt_cache() and deletes all types of Cosmos cache, not just dbt ls cache.

:param max_age_last_usage: Delete cache for DAGs not run within this timeframe
:param session: SQLAlchemy session for Airflow metadata database (automatically provided by @provide_session)
:return: Number of Airflow Variables deleted
"""
return delete_unused_dbt_cache(max_age_last_usage, session, cache_type="dbt ls")


@provide_session
def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete all Cosmos cache stored in remote storage (dbt ls, YAML selectors, etc.).

This function is maintained for backwards compatibility and API stability. It has identical behavior
to delete_unused_dbt_remote_cache_files() and deletes all types of Cosmos cache, not just dbt ls cache.

:param max_age_last_usage: Delete cache for DAGs not run within this timeframe
:param session: SQLAlchemy session for Airflow metadata database (automatically provided by @provide_session)
:return: Number of remote cache files deleted
"""
return delete_unused_dbt_remote_cache_files(max_age_last_usage, session, cache_type="dbt ls")


@provide_session
def delete_unused_dbt_yaml_selectors_cache(
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete all Cosmos cache stored in Airflow Variables (dbt ls, YAML selectors, etc.).

This function is maintained for backwards compatibility and API stability. It has identical behavior
to delete_unused_dbt_cache() and deletes all types of Cosmos cache, not just YAML selectors cache.

:param max_age_last_usage: Delete cache for DAGs not run within this timeframe
:param session: SQLAlchemy session for Airflow metadata database (automatically provided by @provide_session)
:return: Number of Airflow Variables deleted
"""
return delete_unused_dbt_cache(max_age_last_usage, session, cache_type="dbt yaml selectors")


@provide_session
def delete_unused_dbt_yaml_selectors_remote_cache_files( # pragma: no cover
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete all Cosmos cache stored in remote storage (dbt ls, YAML selectors, etc.).

This function is maintained for backwards compatibility and API stability. It has identical behavior
to delete_unused_dbt_remote_cache_files() and deletes all types of Cosmos cache, not just YAML selectors cache.

:param max_age_last_usage: Delete cache for DAGs not run within this timeframe
:param session: SQLAlchemy session for Airflow metadata database (automatically provided by @provide_session)
:return: Number of remote cache files deleted
"""
return delete_unused_dbt_remote_cache_files(max_age_last_usage, session, cache_type="dbt yaml selectors")


def is_profile_cache_enabled() -> bool:
"""Return True if global and profile cache is enable"""
return enable_cache and enable_cache_profile
Expand Down
2 changes: 2 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class RenderConfig:
:param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6).
:param source_pruning: Determines if source nodes without a corresponding downstream task should be removed or not. Default is False
:param airflow_vars_to_purge_dbt_ls_cache: Specify Airflow variables that will affect the LoadMode.DBT_LS cache.
:param airflow_vars_to_purge_dbt_yaml_selectors_cache: Specify Airflow variables that will affect the parsed manifest YamlSelectors cache.
:param normalize_task_id: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name.
:param normalize_task_display_name: A callable that takes a dbt node as input and returns the task display name. This allows users to assign a custom task display name separate from the node ID.
:param should_detach_multiple_parents_tests: A boolean that allows users to decide whether to run tests with multiple parent dependencies in separate tasks.
Expand All @@ -99,6 +100,7 @@ class RenderConfig:
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE
source_pruning: bool = False
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)
airflow_vars_to_purge_dbt_yaml_selectors_cache: list[str] = field(default_factory=list)
normalize_task_id: Callable[..., Any] | None = None
normalize_task_display_name: Callable[..., Any] | None = None
should_detach_multiple_parents_tests: bool = False
Expand Down
Loading