Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
b1603c2
Port implementation without tests or optimizations
YourRoyalLinus Jan 6, 2026
f1c0992
Implement caching for parsed yaml selectors
YourRoyalLinus Jan 6, 2026
2bd3804
Fix static typing errors
YourRoyalLinus Jan 7, 2026
bc08169
Fix code complexity linting errors
YourRoyalLinus Jan 7, 2026
728a30a
Update comments
YourRoyalLinus Jan 8, 2026
795f964
Converge on single graph cache
YourRoyalLinus Jan 8, 2026
f609ada
Use proper function to check for selectors yaml cache invalidation
YourRoyalLinus Jan 8, 2026
f88b344
Update comments and docstrings
YourRoyalLinus Jan 8, 2026
1b42774
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
YourRoyalLinus Jan 8, 2026
389d5e1
Update comments and docstrings
YourRoyalLinus Jan 9, 2026
b889bd5
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
YourRoyalLinus Jan 9, 2026
195eb0c
Refactor and reorganize yaml selectors logic
YourRoyalLinus Jan 15, 2026
93078dd
Pull preprocessed selector definitions from the manifest
YourRoyalLinus Jan 15, 2026
2b410a4
Remove dbt_spec_version
YourRoyalLinus Jan 15, 2026
52c6982
Invalidate cosmos cache when YamlSelectors implementation changes
YourRoyalLinus Jan 16, 2026
02679bf
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
YourRoyalLinus Jan 16, 2026
6c1402e
Remove changes from previous approach
YourRoyalLinus Jan 21, 2026
ec12d86
Implement unit/integration tests
YourRoyalLinus Jan 23, 2026
f2a398d
Return hash of source code for impl_version
YourRoyalLinus Jan 23, 2026
c87043b
Unify dbt_cache implementation (preserving public API)
YourRoyalLinus Jan 23, 2026
9b16f05
Update docs
YourRoyalLinus Jan 23, 2026
2e05fca
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
YourRoyalLinus Jan 23, 2026
9c2c718
Use cached_property for impl_version
YourRoyalLinus Jan 26, 2026
7de468f
Shorten manifest selectors example dag name
YourRoyalLinus Jan 26, 2026
5ed61dd
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
YourRoyalLinus Jan 26, 2026
63f55b0
Fix failing unit tests on build agent
YourRoyalLinus Jan 26, 2026
f5d6b8e
Fix typo in caching docs + Add entry for selector caching to cosmos-c…
YourRoyalLinus Jan 26, 2026
cad91ac
Apply copilot PR suggestions
YourRoyalLinus Jan 27, 2026
2f040d6
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
YourRoyalLinus Jan 27, 2026
2403b42
Ensure consistent config naming + Update docs
YourRoyalLinus Jan 27, 2026
424a9a1
Fix failing integration tests due to AF Object Storage version support
tatiana Jan 29, 2026
0fbf7e1
Change manifest with selector path to use manifest_with_selector.json…
tatiana Jan 29, 2026
9b2c4f2
Restore manifest.json before selector support
tatiana Jan 29, 2026
087da1c
Move selectors and manifest to the altered_jaffle_shop folder
tatiana Jan 29, 2026
15c927a
Fix unittests related to the hash of the jaffle_shop project in MacOS
tatiana Jan 29, 2026
134bf50
Change test_save_yaml_selectors_cache to use project that defines sel…
tatiana Jan 29, 2026
0e3a28e
Remove default selector - previously it would not display any nodes b…
tatiana Jan 29, 2026
e4fa35d
Fix hash for test_save_yaml_selectors_cache in linux
tatiana Jan 29, 2026
dec5b68
Fix issue in running example DAG dev/dags/cosmos_manifest_selectors_e…
tatiana Jan 29, 2026
fd017e4
Fix last broken tests
tatiana Jan 29, 2026
cab688f
Comment DAGs that should not be run in AF 3
tatiana Jan 29, 2026
08cbc7f
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
tatiana Jan 29, 2026
485e0a4
Update docs/configuration/caching.rst
tatiana Jan 29, 2026
66f1b32
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
tatiana Jan 29, 2026
c8b8a93
Re-apply the changes before the rebase
tatiana Jan 29, 2026
2dfd1d4
Fix cache keys after rebase
tatiana Jan 29, 2026
274e8ea
Fix cache keys after rebase
tatiana Jan 29, 2026
e30828a
Remove default selector - as I had done before the rebase
tatiana Jan 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions dev/dags/cosmos_manifest_selectors_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

execution_config = ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop")
execution_config = ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "altered_jaffle_shop")

profile_config = ProfileConfig(
profile_name="default",
Expand Down Expand Up @@ -47,11 +47,29 @@
):
pre_dbt = EmptyOperator(task_id="pre_dbt")

# The selector `critical_path` only selects the `customers` model, which means we were lacking its upstream tasks.
# Without `stg_customers`, dbt will not be able to run the `customers` model
pre_condition = DbtTaskGroup(
group_id="pre_condition",
project_config=ProjectConfig(
manifest_path=DBT_ROOT_PATH / "altered_jaffle_shop" / "target" / "manifest.json",
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=RenderConfig(
load_method=LoadMode.DBT_MANIFEST,
select=["+customers"],
airflow_vars_to_purge_dbt_yaml_selectors_cache=["purge"],
),
execution_config=execution_config,
operator_args={"install_deps": True},
)

# [START local_example]
local_example = DbtTaskGroup(
group_id="local_example",
project_config=ProjectConfig(
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
manifest_path=DBT_ROOT_PATH / "altered_jaffle_shop" / "target" / "manifest.json",
project_name="jaffle_shop",
),
profile_config=profile_config,
Expand All @@ -65,7 +83,7 @@
aws_s3_example = DbtTaskGroup(
group_id="aws_s3_example",
project_config=ProjectConfig(
manifest_path="s3://cosmos-manifest-test/manifest.json",
manifest_path="s3://cosmos-manifest-test/manifest_with_selector.json",
manifest_conn_id="aws_s3_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `aws_default` is used.
project_name="jaffle_shop",
Expand All @@ -81,7 +99,7 @@
gcp_gs_example = DbtTaskGroup(
group_id="gcp_gs_example",
project_config=ProjectConfig(
manifest_path="gs://cosmos_remote_target/manifest.json",
manifest_path="gs://cosmos_remote_target/manifest_with_selector.json",
manifest_conn_id="gcp_gs_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `google_cloud_default` is used.
project_name="jaffle_shop",
Expand All @@ -97,7 +115,7 @@
azure_abfs_example = DbtTaskGroup(
group_id="azure_abfs_example",
project_config=ProjectConfig(
manifest_path="abfs://cosmos-manifest-test/manifest.json",
manifest_path="abfs://cosmos-manifest-test/manifest_with_selector.json",
manifest_conn_id="azure_abfs_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `wasb_default` is used.
project_name="jaffle_shop",
Expand All @@ -111,4 +129,4 @@

post_dbt = EmptyOperator(task_id="post_dbt")

(pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt)
(pre_dbt >> pre_condition >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ selectors:

- name: core_models
description: "Select core business logic models (non-staging)"
default: true
#default: true
definition:
method: tag
value: core
Expand Down
18,250 changes: 18,250 additions & 0 deletions dev/dags/dbt/altered_jaffle_shop/target/manifest.json

Large diffs are not rendered by default.

80 changes: 1 addition & 79 deletions dev/dags/dbt/jaffle_shop/target/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -15852,85 +15852,7 @@
]
},
"saved_queries": {},
"selectors": {
"core_models": {
"definition": {
"method": "tag",
"value": "core"
},
"description": "Select core business logic models (non-staging)",
"name": "core_models"
},
"critical_path": {
"definition": {
"union": [
{
"method": "fqn",
"value": "customers"
},
{
"method": "fqn",
"value": "orders"
}
]
},
"description": "Critical business models",
"name": "critical_path"
},
"customers_and_downstream": {
"definition": {
"childrens_parents": true,
"method": "fqn",
"value": "customers"
},
"description": "Select customers model and all downstream dependencies",
"name": "customers_and_downstream"
},
"exclude_staging": {
"definition": {
"union": [
{
"method": "fqn",
"value": "*"
},
{
"exclude": [
{
"method": "path",
"value": "models/staging"
}
]
}
]
},
"description": "All models except staging",
"name": "exclude_staging"
},
"nightly_models": {
"definition": {
"method": "tag",
"value": "nightly"
},
"description": "Models tagged for nightly runs",
"name": "nightly_models"
},
"staging_models": {
"definition": {
"method": "path",
"value": "models/staging"
},
"description": "Select all staging models",
"name": "staging_models"
},
"staging_orders": {
"definition": {
"method": "fqn",
"value": "jaffle_shop.staging.stg_orders"
},
"description": "Select staging orders model specifically",
"name": "staging_orders"
}
},
"selectors": {},
"semantic_models": {},
"sources": {},
"unit_tests": {}
Expand Down
6 changes: 3 additions & 3 deletions docs/configuration/caching.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ As an example, the following clean-up DAG will delete any cache associated with
:start-after: [START cache_example]
:end-before: [END cache_example]

.. note::
Because the backing Airflow Variable is shared between the dbt ls cache and the YAML selectors cache, delete methods for the non-remote cache will delete the same Airflow variable.
In other words, if you call ``delete_unused_dbt_ls_cache``, it will also delete the YAML selectors cache for the same DAG or TaskGroup, and vice versa.
.. warning::
Because the backing Airflow Variable is shared between the dbt ls cache and the YAML selectors cache, delete methods for the non-remote cache delete the same Airflow Variable.
In other words, if you call ``delete_unused_dbt_ls_cache``, it will also delete the YAML selectors cache for the same DAG or TaskGroup, and vice versa, and calling ``delete_unused_dbt_yaml_selectors_cache`` will delete the corresponding dbt ls cache.

**Cache key**

Expand Down
8 changes: 4 additions & 4 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2172,9 +2172,9 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir
assert hash_args == "d41d8cd98f00b204e9800998ecf8427e"
if sys.platform == "darwin":
# We faced inconsistent hashing versions depending on the version of MacOS/Linux - the following line aims to address these.
assert hash_dir in ("67c608d42ca8bdeef6c3eb8fa888471b",)
assert hash_dir in ("71afaf84962c855b0b67caf59c808521",)
else:
assert hash_dir == "fbe70f1477c038da4607f9efb7a8a4d8"
assert hash_dir == "85cba4ef17dd7c161938da6980a6ff85"


@patch("cosmos.dbt.graph.datetime")
Expand Down Expand Up @@ -2212,9 +2212,9 @@ def test_save_yaml_selectors_cache(mock_variable_set, mock_datetime, tmp_dbt_pro

if sys.platform == "darwin":
# We faced inconsistent hashing versions depending on the version of MacOS/Linux - the following line aims to address these.
assert hash_dir in ("67c608d42ca8bdeef6c3eb8fa888471b",)
assert hash_dir in ("71afaf84962c855b0b67caf59c808521",)
else:
assert hash_dir == "fbe70f1477c038da4607f9efb7a8a4d8"
assert hash_dir == "85cba4ef17dd7c161938da6980a6ff85"


@pytest.mark.integration
Expand Down
10 changes: 9 additions & 1 deletion tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
KUBERNETES_DAGS = ["jaffle_shop_kubernetes", "jaffle_shop_watcher_kubernetes"]

MIN_VER_DAG_FILE: dict[str, list[str]] = {
"2.8": ["cosmos_manifest_example.py", "simple_dag_async.py", "cosmos_callback_dag.py"],
"2.8": [
"cosmos_manifest_example.py",
"simple_dag_async.py",
"cosmos_callback_dag.py",
"cosmos_manifest_selectors_example.py",
],
}

IGNORED_DAG_FILES = ["performance_dag.py", "jaffle_shop_kubernetes.py", "jaffle_shop_watcher_kubernetes.py"]
Expand Down Expand Up @@ -71,6 +76,9 @@ def get_dag_bag() -> DagBag: # noqa: C901
if AIRFLOW_VERSION < Version("2.8.0"):
file.writelines("example_cosmos_dbt_build.py\n")

if AIRFLOW_VERSION >= Version("3.0.0"):
file.writelines("example_cosmos_cleanup_dag.py\n")

# Disabling these DAGs temporarily due to an Airflow 3 bug on processing DatasetAlias that contain non-ASCII characters:
# https://github.com/apache/airflow/issues/51566
# https://github.com/astronomer/astronomer-cosmos/issues/1802
Expand Down
13 changes: 12 additions & 1 deletion tests/test_example_dags_no_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from dbt.version import get_installed_version as get_dbt_version
from packaging.version import Version

from cosmos.constants import AIRFLOW_VERSION

EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags"
AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore"
DBT_VERSION = Version(get_dbt_version().to_version_string()[1:])
Expand All @@ -17,7 +19,12 @@
"2.8": ["cosmos_manifest_example.py"],
}

IGNORED_DAG_FILES = ["performance_dag.py", "jaffle_shop_kubernetes.py", "jaffle_shop_watcher_kubernetes.py"]
IGNORED_DAG_FILES = [
"performance_dag.py",
"jaffle_shop_kubernetes.py",
"jaffle_shop_watcher_kubernetes.py",
"cosmos_manifest_selectors_example.py",
]

# Sort descending based on Versions and convert string to an actual version
MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = {
Expand Down Expand Up @@ -53,6 +60,10 @@ def get_dag_bag() -> DagBag:
for file_name in ["cosmos_profile_mapping.py"]:
print(f"Adding {file_name} to .airflowignore")
file.write(f"{file_name}\n")

if AIRFLOW_VERSION >= Version("3.0.0"):
file.writelines("example_cosmos_cleanup_dag.py\n")

print(".airflowignore contents: ")
print(AIRFLOW_IGNORE_FILE.read_text())
db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False)
Expand Down