Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions dev/dags/cosmos_manifest_selectors_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

execution_config = ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop")
execution_config = ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "altered_jaffle_shop")

profile_config = ProfileConfig(
profile_name="default",
Expand Down Expand Up @@ -47,11 +47,29 @@
):
pre_dbt = EmptyOperator(task_id="pre_dbt")

# The selector `critical_path` only selects the `customers` model, which means we were lacking its upstream tasks.
# Without `stg_customers`, dbt will not be able to run the `customers` model
pre_condition = DbtTaskGroup(
group_id="pre_condition",
project_config=ProjectConfig(
manifest_path=DBT_ROOT_PATH / "altered_jaffle_shop" / "target" / "manifest.json",
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=RenderConfig(
load_method=LoadMode.DBT_MANIFEST,
select=["+customers"],
airflow_vars_to_purge_dbt_yaml_selectors_cache=["purge"],
),
execution_config=execution_config,
operator_args={"install_deps": True},
)

# [START local_example]
local_example = DbtTaskGroup(
group_id="local_example",
project_config=ProjectConfig(
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
manifest_path=DBT_ROOT_PATH / "altered_jaffle_shop" / "target" / "manifest.json",
project_name="jaffle_shop",
),
profile_config=profile_config,
Expand All @@ -65,7 +83,7 @@
aws_s3_example = DbtTaskGroup(
group_id="aws_s3_example",
project_config=ProjectConfig(
manifest_path="s3://cosmos-manifest-test/manifest.json",
manifest_path="s3://cosmos-manifest-test/manifest_with_selector.json",
manifest_conn_id="aws_s3_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `aws_default` is used.
project_name="jaffle_shop",
Expand All @@ -81,7 +99,7 @@
gcp_gs_example = DbtTaskGroup(
group_id="gcp_gs_example",
project_config=ProjectConfig(
manifest_path="gs://cosmos_remote_target/manifest.json",
manifest_path="gs://cosmos_remote_target/manifest_with_selector.json",
manifest_conn_id="gcp_gs_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `google_cloud_default` is used.
project_name="jaffle_shop",
Expand All @@ -97,7 +115,7 @@
azure_abfs_example = DbtTaskGroup(
group_id="azure_abfs_example",
project_config=ProjectConfig(
manifest_path="abfs://cosmos-manifest-test/manifest.json",
manifest_path="abfs://cosmos-manifest-test/manifest_with_selector.json",
manifest_conn_id="azure_abfs_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `wasb_default` is used.
project_name="jaffle_shop",
Expand All @@ -111,4 +129,4 @@

post_dbt = EmptyOperator(task_id="post_dbt")

(pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt)
(pre_dbt >> pre_condition >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ selectors:

- name: core_models
description: "Select core business logic models (non-staging)"
default: true
#default: true
definition:
method: tag
value: core
Expand Down
18,250 changes: 18,250 additions & 0 deletions dev/dags/dbt/altered_jaffle_shop/target/manifest.json

Large diffs are not rendered by default.

80 changes: 1 addition & 79 deletions dev/dags/dbt/jaffle_shop/target/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -15852,85 +15852,7 @@
]
},
"saved_queries": {},
"selectors": {
"core_models": {
"definition": {
"method": "tag",
"value": "core"
},
"description": "Select core business logic models (non-staging)",
"name": "core_models"
},
"critical_path": {
"definition": {
"union": [
{
"method": "fqn",
"value": "customers"
},
{
"method": "fqn",
"value": "orders"
}
]
},
"description": "Critical business models",
"name": "critical_path"
},
"customers_and_downstream": {
"definition": {
"childrens_parents": true,
"method": "fqn",
"value": "customers"
},
"description": "Select customers model and all downstream dependencies",
"name": "customers_and_downstream"
},
"exclude_staging": {
"definition": {
"union": [
{
"method": "fqn",
"value": "*"
},
{
"exclude": [
{
"method": "path",
"value": "models/staging"
}
]
}
]
},
"description": "All models except staging",
"name": "exclude_staging"
},
"nightly_models": {
"definition": {
"method": "tag",
"value": "nightly"
},
"description": "Models tagged for nightly runs",
"name": "nightly_models"
},
"staging_models": {
"definition": {
"method": "path",
"value": "models/staging"
},
"description": "Select all staging models",
"name": "staging_models"
},
"staging_orders": {
"definition": {
"method": "fqn",
"value": "jaffle_shop.staging.stg_orders"
},
"description": "Select staging orders model specifically",
"name": "staging_orders"
}
},
"selectors": {},
"semantic_models": {},
"sources": {},
"unit_tests": {}
Expand Down
6 changes: 3 additions & 3 deletions docs/configuration/caching.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ As an example, the following clean-up DAG will delete any cache associated with
:start-after: [START cache_example]
:end-before: [END cache_example]

.. note::
Because the backing Airflow Variable is shared between the dbt ls cache and the YAML selectors cache, delete methods for the non-remote cache will delete the same Airflow variable.
In other words, if you call ``delete_unused_dbt_ls_cache``, it will also delete the YAML selectors cache for the same DAG or TaskGroup, and vice versa.
.. warning::
Because the backing Airflow Variable is shared between the dbt ls cache and the YAML selectors cache, delete methods for the non-remote cache delete the same Airflow Variable.
In other words, if you call ``delete_unused_dbt_ls_cache``, it will also delete the YAML selectors cache for the same DAG or TaskGroup, and vice versa, and calling ``delete_unused_dbt_yaml_selectors_cache`` will delete the corresponding dbt ls cache.

**Cache key**

Expand Down
8 changes: 4 additions & 4 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2172,9 +2172,9 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir
assert hash_args == "d41d8cd98f00b204e9800998ecf8427e"
if sys.platform == "darwin":
# We faced inconsistent hashing versions depending on the version of MacOS/Linux - the following line aims to address these.
assert hash_dir in ("67c608d42ca8bdeef6c3eb8fa888471b",)
assert hash_dir in ("71afaf84962c855b0b67caf59c808521",)
else:
assert hash_dir == "fbe70f1477c038da4607f9efb7a8a4d8"
assert hash_dir == "85cba4ef17dd7c161938da6980a6ff85"


@patch("cosmos.dbt.graph.datetime")
Expand Down Expand Up @@ -2212,9 +2212,9 @@ def test_save_yaml_selectors_cache(mock_variable_set, mock_datetime, tmp_dbt_pro

if sys.platform == "darwin":
# We faced inconsistent hashing versions depending on the version of MacOS/Linux - the following line aims to address these.
assert hash_dir in ("67c608d42ca8bdeef6c3eb8fa888471b",)
assert hash_dir in ("71afaf84962c855b0b67caf59c808521",)
else:
assert hash_dir == "fbe70f1477c038da4607f9efb7a8a4d8"
assert hash_dir == "85cba4ef17dd7c161938da6980a6ff85"


@pytest.mark.integration
Expand Down
10 changes: 9 additions & 1 deletion tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
KUBERNETES_DAGS = ["jaffle_shop_kubernetes", "jaffle_shop_watcher_kubernetes"]

MIN_VER_DAG_FILE: dict[str, list[str]] = {
"2.8": ["cosmos_manifest_example.py", "simple_dag_async.py", "cosmos_callback_dag.py"],
"2.8": [
"cosmos_manifest_example.py",
"simple_dag_async.py",
"cosmos_callback_dag.py",
"cosmos_manifest_selectors_example.py",
],
}

IGNORED_DAG_FILES = ["performance_dag.py", "jaffle_shop_kubernetes.py", "jaffle_shop_watcher_kubernetes.py"]
Expand Down Expand Up @@ -71,6 +76,9 @@ def get_dag_bag() -> DagBag: # noqa: C901
if AIRFLOW_VERSION < Version("2.8.0"):
file.writelines("example_cosmos_dbt_build.py\n")

if AIRFLOW_VERSION >= Version("3.0.0"):
file.writelines("example_cosmos_cleanup_dag.py\n")

# Disabling these DAGs temporarily due to an Airflow 3 bug on processing DatasetAlias that contain non-ASCII characters:
# https://github.com/apache/airflow/issues/51566
# https://github.com/astronomer/astronomer-cosmos/issues/1802
Expand Down
13 changes: 12 additions & 1 deletion tests/test_example_dags_no_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from dbt.version import get_installed_version as get_dbt_version
from packaging.version import Version

from cosmos.constants import AIRFLOW_VERSION

EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags"
AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore"
DBT_VERSION = Version(get_dbt_version().to_version_string()[1:])
Expand All @@ -17,7 +19,12 @@
"2.8": ["cosmos_manifest_example.py"],
}

IGNORED_DAG_FILES = ["performance_dag.py", "jaffle_shop_kubernetes.py", "jaffle_shop_watcher_kubernetes.py"]
IGNORED_DAG_FILES = [
"performance_dag.py",
"jaffle_shop_kubernetes.py",
"jaffle_shop_watcher_kubernetes.py",
"cosmos_manifest_selectors_example.py",
]

# Sort descending based on Versions and convert string to an actual version
MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = {
Expand Down Expand Up @@ -53,6 +60,10 @@ def get_dag_bag() -> DagBag:
for file_name in ["cosmos_profile_mapping.py"]:
print(f"Adding {file_name} to .airflowignore")
file.write(f"{file_name}\n")

if AIRFLOW_VERSION >= Version("3.0.0"):
file.writelines("example_cosmos_cleanup_dag.py\n")

print(".airflowignore contents: ")
print(AIRFLOW_IGNORE_FILE.read_text())
db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False)
Expand Down