diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 774e49f82..d32a5b9e2 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -27,6 +27,7 @@ "ExecutionMode": "cosmos.constants", "InvocationMode": "cosmos.constants", "LoadMode": "cosmos.constants", + "SeedRenderingBehavior": "cosmos.constants", "SourceRenderingBehavior": "cosmos.constants", "TestBehavior": "cosmos.constants", "TestIndirectSelection": "cosmos.constants", @@ -123,6 +124,7 @@ from cosmos.constants import ExecutionMode as ExecutionMode from cosmos.constants import InvocationMode as InvocationMode from cosmos.constants import LoadMode as LoadMode + from cosmos.constants import SeedRenderingBehavior as SeedRenderingBehavior from cosmos.constants import SourceRenderingBehavior as SourceRenderingBehavior from cosmos.constants import TestBehavior as TestBehavior from cosmos.constants import TestIndirectSelection as TestIndirectSelection diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 5287a6076..d663d2098 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -36,6 +36,7 @@ TESTABLE_DBT_RESOURCES, DbtResourceType, ExecutionMode, + SeedRenderingBehavior, SourceRenderingBehavior, TestBehavior, TestIndirectSelection, @@ -366,6 +367,32 @@ def create_task_metadata( # noqa: C901 # `AIRFLOW__COSMOS__PRE_DBT_FUSION=1`. models_select_key = "models" if settings.pre_dbt_fusion else "select" + # Apply seed rendering behavior before the TestBehavior.BUILD branch so it is honored + # regardless of the test behavior (under BUILD, seeds would otherwise be rendered as DbtBuild tasks). + if node.resource_type == DbtResourceType.SEED: + if render_config.seed_rendering_behavior == SeedRenderingBehavior.NONE: + # Do not render the seed in the DAG/TaskGroup at all. + return None + if render_config.seed_rendering_behavior == SeedRenderingBehavior.RENDER_ONLY: + # Render the seed as a no-op placeholder so it stays visible in the DAG + # topology/lineage, but never run `dbt seed`. + task_id, args = _get_task_id_and_args( + node=node, + args=args, + use_task_group=use_task_group, + normalize_task_id=render_config.normalize_task_id, + normalize_task_display_name=render_config.normalize_task_display_name, + resource_suffix=node.resource_type.value, + execution_mode=execution_mode, + ) + # EmptyOperator does not accept custom dbt parameters (e.g. profile_args); recreate the args. + args = {"task_display_name": args["task_display_name"]} if "task_display_name" in args else {} + return TaskMetadata(id=task_id, operator_class=EMPTY_OPERATOR_CLASS_PATH, arguments=args) + if render_config.seed_rendering_behavior == SeedRenderingBehavior.WHEN_SEED_CHANGES: + # Render the seed as usual, but flag the task so the operator skips running + # `dbt seed` when the CSV content is unchanged since the last successful run. + extra_context["should_run_if_seed_changed"] = True + if node.has_ephemeral_materialization and render_config.ephemeral_models_as_empty_operator: # Ephemeral models are inlined as CTEs into downstream models and never written to the # warehouse, so running them via a dbt operator (whether `dbt run` or `dbt build`) is a diff --git a/cosmos/cache.py b/cosmos/cache.py index 5700cecdb..031429a0a 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -12,6 +12,7 @@ import msgpack import yaml +from airflow.exceptions import AirflowException from airflow.models import DagRun, Variable try: @@ -20,6 +21,7 @@ from airflow.models.dag import DAG # type: ignore[assignment] from airflow.utils.session import provide_session from sqlalchemy import select +from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session from cosmos import settings @@ -56,6 +58,10 @@ logger = get_logger(__name__) VAR_KEY_CACHE_PREFIX = "cosmos_cache__" +# Seeds rendered with ``SeedRenderingBehavior.WHEN_SEED_CHANGES`` persist the last-run CSV checksum as an +# Airflow Variable, scoped per ``DbtDag``/``DbtTaskGroup`` and seed, so the same seed rendered in different +# DAGs tracks its state independently (a DAG never wrongly skips a seed because a different DAG ran it). +VAR_KEY_SEED_CHECKSUM_PREFIX = "cosmos_seed_checksum__" def _configure_remote_cache_dir() -> Path | ObjectStoragePath | None: @@ -140,6 +146,45 @@ def create_cache_key(cache_identifier: str) -> str: return f"{VAR_KEY_CACHE_PREFIX}{cache_identifier}" +def _create_seed_checksum_key(dag_task_group_identifier: str, unique_id: str) -> str: + """Return a bounded, stable Airflow Variable key scoped to a DbtDag/DbtTaskGroup and seed. + + Airflow Variable keys are length-bounded (the ``variable.key`` column is commonly 250 chars), so we + store the checksum under a fixed-length key: a readable prefix plus a stable hash of the full + identifier, ensuring long DAG ids / nested task groups / package+resource names never overflow it. + """ + digest = hashlib.sha1(f"{dag_task_group_identifier}:{unique_id}".encode()).hexdigest() + return f"{VAR_KEY_SEED_CHECKSUM_PREFIX}{digest}" + + +def get_seed_checksum(dag_task_group_identifier: str, unique_id: str) -> str | None: + """Return the seed's last persisted checksum, or ``None`` when none is stored or it cannot be read. + + Best-effort: change detection must never fail a seed task, so a missing value or a transient + metadatabase/Variable backend error is logged and treated as "no stored checksum" (the seed runs). + """ + key = _create_seed_checksum_key(dag_task_group_identifier, unique_id) + try: + checksum: str | None = Variable.get(key, default_var=None) + except (AirflowException, SQLAlchemyError) as exc: + logger.warning("Failed to read seed checksum from Variable `%s`: %s", key, exc) + return None + return checksum + + +def store_seed_checksum(dag_task_group_identifier: str, unique_id: str, checksum: str) -> None: + """Persist a seed's checksum after a successful run. + + Best-effort: a failure to store the checksum must never fail a seed that already loaded successfully, + so storage errors are logged and swallowed rather than raised. + """ + key = _create_seed_checksum_key(dag_task_group_identifier, unique_id) + try: + Variable.set(key, checksum) + except (AirflowException, SQLAlchemyError) as exc: + logger.warning("Failed to persist seed checksum under Variable `%s`: %s", key, exc) + + def _obtain_cache_dir_path(cache_identifier: str, base_dir: Path = settings.cache_dir) -> Path: """ Return a directory used to cache a specific Cosmos DbtDag or DbtTaskGroup. If the directory diff --git a/cosmos/config.py b/cosmos/config.py index 18534bbd0..05b8ac1a7 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -27,6 +27,7 @@ ExecutionMode, InvocationMode, LoadMode, + SeedRenderingBehavior, SourceRenderingBehavior, TestBehavior, TestIndirectSelection, @@ -73,6 +74,7 @@ class RenderConfig: :param group_nodes_by_folder: When enabled, groups nodes by folder structure, creating a ``TaskGroup`` per resource type and folder. Disabled by default. :param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6). :param source_pruning: Determines if source nodes without a corresponding downstream task should be removed or not. Default is False + :param seed_rendering_behavior: Determines how seed nodes are rendered and run (ALWAYS, WHEN_SEED_CHANGES, RENDER_ONLY, NONE). Defaults to "ALWAYS", preserving the original Cosmos behaviour of always rendering and running seeds. WHEN_SEED_CHANGES is only supported for ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV and ExecutionMode.AIRFLOW_ASYNC, and is incompatible with TestBehavior.BUILD. Under ExecutionMode.WATCHER, only ALWAYS is meaningful: a single dbt build runs seeds regardless of this setting. :param airflow_vars_to_purge_dbt_ls_cache: Specify Airflow variables that will affect the LoadMode.DBT_LS cache. :param airflow_vars_to_purge_dbt_yaml_selectors_cache: Specify Airflow variables that will affect the parsed manifest YamlSelectors cache. :param normalize_task_id: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name. @@ -101,6 +103,7 @@ class RenderConfig: group_nodes_by_folder: bool = False source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE source_pruning: bool = False + seed_rendering_behavior: SeedRenderingBehavior = SeedRenderingBehavior.ALWAYS airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list) airflow_vars_to_purge_dbt_yaml_selectors_cache: list[str] = field(default_factory=list) normalize_task_id: Callable[..., Any] | None = None @@ -128,6 +131,16 @@ def __post_init__(self, dbt_project_path: str | Path | None) -> None: UserWarning, ) self.source_rendering_behavior = SourceRenderingBehavior.NONE + if ( + self.seed_rendering_behavior == SeedRenderingBehavior.WHEN_SEED_CHANGES + and self.test_behavior == TestBehavior.BUILD + ): + raise CosmosValueError( + "SeedRenderingBehavior.WHEN_SEED_CHANGES is incompatible with TestBehavior.BUILD: under " + "TestBehavior.BUILD seeds run via `dbt build` as a single task and cannot be selectively " + "skipped based on CSV changes. Use TestBehavior.AFTER_EACH/AFTER_ALL/NONE, or " + "SeedRenderingBehavior.ALWAYS." + ) self.project_path = Path(dbt_project_path) if dbt_project_path else None # allows us to initiate this attribute from Path objects and str self.dbt_ls_path = Path(self.dbt_ls_path) if self.dbt_ls_path else None diff --git a/cosmos/constants.py b/cosmos/constants.py index ea79291a9..9e728e074 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -148,6 +148,25 @@ class SourceRenderingBehavior(Enum): WITH_TESTS_OR_FRESHNESS = "with_tests_or_freshness" +class SeedRenderingBehavior(Enum): + """ + Modes to configure how dbt seed nodes are rendered and run. + + ALWAYS: Render the seed and run ``dbt seed`` on every execution (default, original Cosmos behaviour). + WHEN_SEED_CHANGES: Render the seed, but only run ``dbt seed`` when the seed's CSV content has changed + since the last successful run. Supported only for execution modes where the Airflow worker can + access the seed files directly (LOCAL, VIRTUALENV, AIRFLOW_ASYNC). + RENDER_ONLY: Render the seed as a no-op ``EmptyOperator`` placeholder so it remains visible in the + DAG topology/lineage, but never run ``dbt seed``. + NONE: Do not render the seed in the DAG/TaskGroup at all. + """ + + ALWAYS = "always" + WHEN_SEED_CHANGES = "when_seed_changes" + RENDER_ONLY = "render_only" + NONE = "none" + + class DbtResourceType(aenum.Enum): # type: ignore[misc] """ Type of dbt node. diff --git a/cosmos/converter.py b/cosmos/converter.py index 4c6a99054..dad3713e2 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -44,6 +44,7 @@ ExecutionMode, InvocationMode, LoadMode, + SeedRenderingBehavior, ) from cosmos.dbt.executable import get_system_dbt, is_dbt_installed_in_same_environment from cosmos.dbt.graph import DbtGraph @@ -130,6 +131,19 @@ def validate_arguments( if profile_config.profile_mapping: profile_config.profile_mapping.profile_args["schema"] = task_args["schema"] + # SeedRenderingBehavior.WHEN_SEED_CHANGES decides at task runtime whether to run `dbt seed`, which + # requires Cosmos to run dbt directly on the Airflow worker with filesystem access to the seed files. + if render_config.seed_rendering_behavior == SeedRenderingBehavior.WHEN_SEED_CHANGES and ( + execution_config.execution_mode + not in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV, ExecutionMode.AIRFLOW_ASYNC) + ): + raise CosmosValueError( + "SeedRenderingBehavior.WHEN_SEED_CHANGES is only supported with ExecutionMode.LOCAL, " + "ExecutionMode.VIRTUALENV or ExecutionMode.AIRFLOW_ASYNC, which run dbt directly on the " + f"Airflow worker. The configured execution_mode is {execution_config.execution_mode}. Use " + "SeedRenderingBehavior.ALWAYS, or switch to a supported execution mode." + ) + if execution_config.execution_mode in [ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV]: profile_config.validate_profiles_yml() has_non_empty_dependencies = execution_config.project_path and has_non_empty_dependencies_file( diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index eec94487f..5c77f452a 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -3,6 +3,7 @@ import base64 import datetime import functools +import hashlib import itertools import json import os @@ -69,6 +70,23 @@ logger = get_logger(__name__) +# Read the seed file in fixed-size chunks when checksumming so a large CSV does not have to be loaded +# into memory all at once. +_CHECKSUM_READ_CHUNK_SIZE = 1024 * 1024 + + +def _calculate_file_checksum(file_path: Path) -> str | None: + """Return the SHA256 checksum of a file, streaming it in chunks, or ``None`` if it cannot be read.""" + digest = hashlib.sha256() + try: + with open(file_path, "rb") as file: + for chunk in iter(lambda: file.read(_CHECKSUM_READ_CHUNK_SIZE), b""): + digest.update(chunk) + except OSError as exc: + logger.warning("Unable to read file `%s` to compute its checksum: %s", file_path, exc) + return None + return digest.hexdigest() + def _normalize_path(path: str | None) -> str: """ @@ -113,6 +131,18 @@ def file_path(self) -> Path: """Combined path to the node's file (path_base / original_file_path).""" return self.path_base / self.original_file_path + @property + def checksum(self) -> str | None: + """SHA256 checksum of a seed's CSV content, used by ``SeedRenderingBehavior.WHEN_SEED_CHANGES``. + + Although ``manifest.json`` records a checksum per node, we always recompute it from the seed file + here so the value is consistent regardless of whether the project was loaded via ``LoadMode.MANIFEST`` + or ``LoadMode.DBT_LS``. Returns ``None`` for non-seed nodes or when the file cannot be read. + """ + if self.resource_type != DbtResourceType.SEED: + return None + return _calculate_file_checksum(self.file_path) + @property def has_ephemeral_materialization(self) -> bool: """Whether the node is materialized as ephemeral (inlined as a CTE, never written to the warehouse).""" @@ -240,6 +270,7 @@ def context_dict(self) -> dict[str, Any]: "has_non_detached_test": self.has_non_detached_test, "resource_name": self.resource_name, "name": self.name, + "checksum": self.checksum, } diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index d9f842514..3f95556e6 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -41,7 +41,9 @@ from cosmos.cache import ( _copy_cached_package_lockfile_to_project, _get_latest_cached_package_lockfile, + get_seed_checksum, is_cache_package_lockfile_enabled, + store_seed_checksum, ) from cosmos.constants import ( _AIRFLOW3_MAJOR_VERSION, @@ -1050,6 +1052,46 @@ class DbtSeedLocalOperator(DbtSeedMixin, DbtLocalBaseOperator): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + def execute(self, context: Context, **kwargs: Any) -> None: + # When SeedRenderingBehavior.WHEN_SEED_CHANGES is active (flagged at render time in + # create_task_metadata), skip running `dbt seed` if the seed's content is unchanged since the + # last successful run. We delegate the actual run to the base execute() so extra_context + # merging, debug-mode memory tracking and **kwargs forwarding are preserved. A `--full-refresh` + # request always runs the seed, bypassing change detection. + extra_context = self.extra_context or {} + if not extra_context.get("should_run_if_seed_changed") or self._is_full_refresh(): + super().execute(context, **kwargs) + return + + node_config = extra_context.get("dbt_node_config") or {} + dag_task_group_identifier = extra_context.get("dbt_dag_task_group_identifier") + unique_id = node_config.get("unique_id") + current_seed_checksum = node_config.get("checksum") + + # Change detection needs the seed's current checksum and a scope (DAG/task-group + seed) to store it + # under. When both are available, skip the run if the checksum matches the last successful run; + # otherwise fall back to always running the seed (change detection is best-effort). + if current_seed_checksum and dag_task_group_identifier and unique_id: + last_run_seed_checksum = get_seed_checksum(dag_task_group_identifier, unique_id) + should_run = last_run_seed_checksum != current_seed_checksum + if not should_run: + # Return successfully (do NOT raise AirflowSkipException) so downstream models that depend + # on the seed are not skip-propagated by the default trigger rule. + self.log.info("Seed `%s` is unchanged since its last successful run; skipping `dbt seed`.", unique_id) + return + super().execute(context, **kwargs) + store_seed_checksum(dag_task_group_identifier, unique_id, current_seed_checksum) + return + + super().execute(context, **kwargs) + + def _is_full_refresh(self) -> bool: + """Return whether --full-refresh is requested, mirroring DbtSeedMixin.add_cmd_flags handling.""" + if isinstance(self.full_refresh, str): + # `to_boolean` does not strip whitespace, so a rendered " true " would otherwise be False. + return bool(to_boolean(self.full_refresh.strip())) + return bool(self.full_refresh) + class DbtSnapshotLocalOperator(DbtSnapshotMixin, DbtLocalBaseOperator): """ diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 010dd6f7e..93d3412d2 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -38,6 +38,7 @@ SUPPORTED_BUILD_RESOURCES, DbtResourceType, ExecutionMode, + SeedRenderingBehavior, SourceRenderingBehavior, TestBehavior, TestIndirectSelection, @@ -560,6 +561,7 @@ def test_create_task_metadata_unsupported(caplog): "has_test": False, "resource_name": "my_model", "name": "my_model", + "checksum": None, }, "package_name": None, }, @@ -605,6 +607,7 @@ def test_create_task_metadata_unsupported(caplog): "has_test": False, "resource_name": "my_snapshot", "name": "my_snapshot", + "checksum": None, }, "package_name": None, }, @@ -2383,3 +2386,70 @@ def test_watcher_dependency_wiring(test_behavior, depends_on_past): "tg.dbt_producer_watcher", "tg.astro_shop_test", } + + +def _seed_node(): + return DbtNode( + unique_id=f"{DbtResourceType.SEED.value}.my_folder.my_seed", + resource_type=DbtResourceType.SEED, + depends_on=[], + path_base=Path("."), + original_file_path=Path("."), + tags=[], + config={}, + ) + + +@pytest.mark.parametrize("test_behavior", [TestBehavior.AFTER_EACH, TestBehavior.BUILD]) +def test_create_task_metadata_seed_rendering_none(test_behavior): + """SeedRenderingBehavior.NONE drops the seed regardless of test behavior (incl. BUILD).""" + metadata = create_task_metadata( + _seed_node(), + execution_mode=ExecutionMode.LOCAL, + args={}, + dbt_dag_task_group_identifier="", + render_config=RenderConfig(seed_rendering_behavior=SeedRenderingBehavior.NONE, test_behavior=test_behavior), + ) + assert metadata is None + + +@pytest.mark.parametrize("test_behavior", [TestBehavior.AFTER_EACH, TestBehavior.BUILD]) +def test_create_task_metadata_seed_rendering_render_only(test_behavior): + """RENDER_ONLY renders the seed as an EmptyOperator placeholder, never a dbt seed/build task.""" + metadata = create_task_metadata( + _seed_node(), + execution_mode=ExecutionMode.LOCAL, + args={}, + dbt_dag_task_group_identifier="", + render_config=RenderConfig( + seed_rendering_behavior=SeedRenderingBehavior.RENDER_ONLY, test_behavior=test_behavior + ), + ) + assert metadata.id == "my_seed_seed" + assert metadata.operator_class == EMPTY_OPERATOR_CLASS_PATH + + +def test_create_task_metadata_seed_rendering_when_seed_changes_sets_flag(): + """WHEN_SEED_CHANGES renders the normal seed operator and flags change detection in extra_context.""" + metadata = create_task_metadata( + _seed_node(), + execution_mode=ExecutionMode.LOCAL, + args={}, + dbt_dag_task_group_identifier="", + render_config=RenderConfig(seed_rendering_behavior=SeedRenderingBehavior.WHEN_SEED_CHANGES), + ) + assert metadata.operator_class == "cosmos.operators.local.DbtSeedLocalOperator" + assert metadata.extra_context["should_run_if_seed_changed"] is True + + +def test_create_task_metadata_seed_rendering_always_no_flag(): + """ALWAYS (default) renders the normal seed operator without the change-detection flag.""" + metadata = create_task_metadata( + _seed_node(), + execution_mode=ExecutionMode.LOCAL, + args={}, + dbt_dag_task_group_identifier="", + render_config=RenderConfig(seed_rendering_behavior=SeedRenderingBehavior.ALWAYS), + ) + assert metadata.operator_class == "cosmos.operators.local.DbtSeedLocalOperator" + assert "should_run_if_seed_changed" not in metadata.extra_context diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index c254d66c7..1d5055418 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -138,6 +138,44 @@ def test_dbt_node_name_and_select(unique_id, expected_name, expected_select): assert node.resource_name == expected_select +def test_dbt_node_checksum_computes_sha256_of_seed_file(tmp_path): + import hashlib + + content = b"id,name\n1,alice\n" + (tmp_path / "my_seed.csv").write_bytes(content) + node = DbtNode( + unique_id="seed.my_project.my_seed", + resource_type=DbtResourceType.SEED, + depends_on=[], + path_base=tmp_path, + original_file_path=Path("my_seed.csv"), + ) + assert node.checksum == hashlib.sha256(content).hexdigest() + + +def test_dbt_node_checksum_is_none_for_non_seed_nodes(tmp_path): + (tmp_path / "model.sql").write_text("select 1") + node = DbtNode( + unique_id="model.my_project.my_model", + resource_type=DbtResourceType.MODEL, + depends_on=[], + path_base=tmp_path, + original_file_path=Path("model.sql"), + ) + assert node.checksum is None + + +def test_dbt_node_checksum_is_none_when_seed_file_missing(tmp_path): + node = DbtNode( + unique_id="seed.my_project.my_seed", + resource_type=DbtResourceType.SEED, + depends_on=[], + path_base=tmp_path, + original_file_path=Path("does_not_exist.csv"), + ) + assert node.checksum is None + + class TestGetResourceNameFromUniqueId: def test_plain_model(self): assert DbtNode.get_resource_name_from_unique_id("model.my_pkg.my_model") == "my_model" @@ -249,6 +287,7 @@ def test_dbt_profile_config_to_override(): "has_non_detached_test": False, "resource_name": "customers", "name": "customers", + "checksum": None, }, ), ( @@ -265,6 +304,7 @@ def test_dbt_profile_config_to_override(): "has_non_detached_test": False, "resource_name": "customers.v1", "name": "customers_v1", + "checksum": None, }, ), ], diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 0e2822b7a..2f07115ad 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -2615,6 +2615,112 @@ def test_returns_none_on_invalid_json(self, tmp_path): assert result is None +def _seed_change_extra_context(*, checksum="current-checksum"): + return { + "should_run_if_seed_changed": True, + "dbt_dag_task_group_identifier": "my_dag", + "dbt_node_config": { + "unique_id": "seed.pkg.my_seed", + "checksum": checksum, + }, + } + + +@patch("cosmos.cache.Variable") +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_dbt_seed_local_operator_skips_when_unchanged(mock_build_and_run_cmd, mock_variable, caplog): + """When the seed checksum is unchanged, execute() must not run `dbt seed` and must not raise (success).""" + mock_variable.get.return_value = "current-checksum" # stored == current + operator = DbtSeedLocalOperator( + profile_config=profile_config, + task_id="my_seed", + project_dir="my/dir", + extra_context=_seed_change_extra_context(), + ) + caplog.set_level(logging.INFO) + operator.execute(context={}) + mock_build_and_run_cmd.assert_not_called() + mock_variable.set.assert_not_called() + assert "unchanged" in caplog.text + + +@patch("cosmos.cache.Variable") +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_dbt_seed_local_operator_runs_and_persists_when_changed(mock_build_and_run_cmd, mock_variable): + """When the seed checksum changed, execute() runs `dbt seed` and persists the new checksum.""" + mock_variable.get.return_value = "stale-checksum" # differs from current + operator = DbtSeedLocalOperator( + profile_config=profile_config, + task_id="my_seed", + project_dir="my/dir", + extra_context=_seed_change_extra_context(), + ) + operator.execute(context={}) + mock_build_and_run_cmd.assert_called_once() + mock_variable.set.assert_called_once() + + +@patch("cosmos.cache.Variable") +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_dbt_seed_local_operator_runs_when_checksum_unresolved(mock_build_and_run_cmd, mock_variable): + """When the current checksum could not be resolved, execute() runs the seed without touching Variables.""" + operator = DbtSeedLocalOperator( + profile_config=profile_config, + task_id="my_seed", + project_dir="my/dir", + extra_context=_seed_change_extra_context(checksum=None), + ) + operator.execute(context={}) + mock_build_and_run_cmd.assert_called_once() + mock_variable.get.assert_not_called() + mock_variable.set.assert_not_called() + + +@patch("cosmos.cache.Variable") +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_dbt_seed_local_operator_full_refresh_bypasses_detection(mock_build_and_run_cmd, mock_variable): + """--full-refresh always runs the seed, even when the checksum is unchanged.""" + mock_variable.get.return_value = "current-checksum" # unchanged + operator = DbtSeedLocalOperator( + profile_config=profile_config, + task_id="my_seed", + project_dir="my/dir", + full_refresh=True, + extra_context=_seed_change_extra_context(), + ) + operator.execute(context={}) + mock_build_and_run_cmd.assert_called_once() + mock_variable.get.assert_not_called() + + +@patch("cosmos.cache.Variable") +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_dbt_seed_local_operator_runs_normally_without_change_detection(mock_build_and_run_cmd, mock_variable): + """Without the change-detection flag (ALWAYS), execute() runs `dbt seed` and never touches Variables.""" + operator = DbtSeedLocalOperator(profile_config=profile_config, task_id="my_seed", project_dir="my/dir") + operator.execute(context={}) + mock_build_and_run_cmd.assert_called_once() + mock_variable.get.assert_not_called() + + +@pytest.mark.parametrize( + "full_refresh,expected", + [ + (True, True), + (False, False), + ("true", True), + (" true ", True), # rendered templates may carry surrounding whitespace + ("false", False), + ("", False), + ], +) +def test_dbt_seed_local_operator_is_full_refresh(full_refresh, expected): + operator = DbtSeedLocalOperator( + profile_config=profile_config, task_id="my_seed", project_dir="my/dir", full_refresh=full_refresh + ) + assert operator._is_full_refresh() is expected + + def test_dbt_local_operator_warns_on_output_only_template_fields(caplog): """Test that passing compiled_sql or freshness to local operators emits a warning.""" from cosmos.operators.local import DbtRunLocalOperator diff --git a/tests/test_cache.py b/tests/test_cache.py index 1c379fdf9..28c6b8b52 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -25,10 +25,12 @@ from airflow.utils.task_group import TaskGroup from cosmos.cache import ( + VAR_KEY_SEED_CHECKSUM_PREFIX, _calculate_yaml_selectors_cache_current_version, _configure_remote_cache_dir, _copy_partial_parse_to_project, _create_cache_identifier, + _create_seed_checksum_key, _get_latest_cached_package_lockfile, _get_latest_partial_parse, _get_or_create_profile_cache_dir, @@ -37,8 +39,10 @@ create_cache_profile, delete_unused_dbt_cache, get_cached_profile, + get_seed_checksum, is_cache_package_lockfile_enabled, is_profile_cache_enabled, + store_seed_checksum, were_yaml_selectors_modified, ) from cosmos.constants import ( @@ -476,3 +480,47 @@ def test_were_yaml_selectors_modified_false(): previous_version = "dbt_project_hash_v1, yamlselectors_hash_v1, impl_hash_v1" current_version = "dbt_project_hash_v1, yamlselectors_hash_v1, impl_hash_v1" assert were_yaml_selectors_modified(previous_version, current_version) is False + + +def test_create_seed_checksum_key_is_bounded_prefixed_and_scoped(): + key = _create_seed_checksum_key("a" * 500, "seed.pkg." + "b" * 500) + assert key.startswith(VAR_KEY_SEED_CHECKSUM_PREFIX) + assert len(key) <= 250 + # Different DAG/task-group identifier or seed -> different key. + assert key != _create_seed_checksum_key("other_dag", "seed.pkg.b" * 500) + assert _create_seed_checksum_key("dag", "seed.pkg.a") != _create_seed_checksum_key("dag", "seed.pkg.b") + # Stable across calls for the same inputs. + assert _create_seed_checksum_key("dag", "seed.pkg.a") == _create_seed_checksum_key("dag", "seed.pkg.a") + + +@patch("cosmos.cache.Variable") +def test_get_seed_checksum_returns_stored_value(mock_variable): + mock_variable.get.return_value = "stored-checksum" + assert get_seed_checksum("my_dag", "seed.pkg.my_seed") == "stored-checksum" + mock_variable.get.assert_called_once_with(_create_seed_checksum_key("my_dag", "seed.pkg.my_seed"), default_var=None) + + +@patch("cosmos.cache.Variable") +def test_get_seed_checksum_swallows_backend_errors(mock_variable, caplog): + from sqlalchemy.exc import SQLAlchemyError + + mock_variable.get.side_effect = SQLAlchemyError("db down") + caplog.set_level(logging.WARNING) + assert get_seed_checksum("my_dag", "seed.pkg.my_seed") is None + assert "Failed to read seed checksum" in caplog.text + + +@patch("cosmos.cache.Variable") +def test_store_seed_checksum_persists_value(mock_variable): + store_seed_checksum("my_dag", "seed.pkg.my_seed", "new-checksum") + mock_variable.set.assert_called_once_with(_create_seed_checksum_key("my_dag", "seed.pkg.my_seed"), "new-checksum") + + +@patch("cosmos.cache.Variable") +def test_store_seed_checksum_swallows_backend_errors(mock_variable, caplog): + from airflow.exceptions import AirflowException + + mock_variable.set.side_effect = AirflowException("cannot write") + caplog.set_level(logging.WARNING) + store_seed_checksum("my_dag", "seed.pkg.my_seed", "new-checksum") # must not raise + assert "Failed to persist seed checksum" in caplog.text diff --git a/tests/test_config.py b/tests/test_config.py index 109c18ce6..1640a7191 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -5,7 +5,7 @@ import pytest from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import ExecutionMode, InvocationMode, SourceRenderingBehavior +from cosmos.constants import ExecutionMode, InvocationMode, SeedRenderingBehavior, SourceRenderingBehavior, TestBehavior from cosmos.exceptions import CosmosValueError from cosmos.profiles.athena.access_key import AthenaAccessKeyProfileMapping from cosmos.profiles.postgres.user_pass import PostgresUserPasswordProfileMapping @@ -335,3 +335,22 @@ def test_remote_manifest_path(manifest_path, given_manifest_conn_id, used_manife dbt_project_path="/tmp/some-path", manifest_path=manifest_path, manifest_conn_id=given_manifest_conn_id ) assert project_config.manifest_path == ObjectStoragePath(manifest_path, conn_id=used_manifest_conn_id) + + +def test_render_config_seed_rendering_when_seed_changes_with_build_raises(): + """WHEN_SEED_CHANGES cannot be combined with TestBehavior.BUILD.""" + with pytest.raises(CosmosValueError, match="WHEN_SEED_CHANGES is incompatible with TestBehavior.BUILD"): + RenderConfig( + seed_rendering_behavior=SeedRenderingBehavior.WHEN_SEED_CHANGES, + test_behavior=TestBehavior.BUILD, + ) + + +@pytest.mark.parametrize("test_behavior", [TestBehavior.AFTER_EACH, TestBehavior.AFTER_ALL, TestBehavior.NONE]) +def test_render_config_seed_rendering_when_seed_changes_allowed_with_non_build(test_behavior): + config = RenderConfig(seed_rendering_behavior=SeedRenderingBehavior.WHEN_SEED_CHANGES, test_behavior=test_behavior) + assert config.seed_rendering_behavior == SeedRenderingBehavior.WHEN_SEED_CHANGES + + +def test_render_config_seed_rendering_behavior_defaults_to_always(): + assert RenderConfig().seed_rendering_behavior == SeedRenderingBehavior.ALWAYS diff --git a/tests/test_converter.py b/tests/test_converter.py index 0f21b1bc2..04cd64d1d 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1,5 +1,6 @@ import logging import tempfile +from contextlib import nullcontext as does_not_raise from datetime import datetime from pathlib import Path from unittest.mock import MagicMock, patch @@ -18,6 +19,7 @@ ExecutionMode, InvocationMode, LoadMode, + SeedRenderingBehavior, TestBehavior, ) from cosmos.converter import DbtToAirflowConverter, validate_arguments, validate_initial_user_config @@ -1525,6 +1527,37 @@ def test_telemetry_metadata_not_stored_when_disabled(mock_should_emit, mock_load assert "__cosmos_telemetry_metadata__" not in dag.params +@pytest.mark.parametrize( + "execution_mode, expectation", + [ + (ExecutionMode.LOCAL, does_not_raise()), + (ExecutionMode.VIRTUALENV, does_not_raise()), + (ExecutionMode.AIRFLOW_ASYNC, does_not_raise()), + (ExecutionMode.DOCKER, pytest.raises(CosmosValueError)), + (ExecutionMode.KUBERNETES, pytest.raises(CosmosValueError)), + (ExecutionMode.WATCHER, pytest.raises(CosmosValueError)), + ], +) +def test_validate_arguments_seed_when_seed_changes_execution_mode(execution_mode, expectation): + """WHEN_SEED_CHANGES is only allowed for worker-filesystem execution modes.""" + project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST, project_name="xubiru") + render_config = RenderConfig(seed_rendering_behavior=SeedRenderingBehavior.WHEN_SEED_CHANGES) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), + ) + execution_config = ExecutionConfig(execution_mode=execution_mode) + with expectation: + validate_arguments( + execution_config=execution_config, + profile_config=profile_config, + project_config=project_config, + render_config=render_config, + task_args={}, + ) + + def _build_altered_jaffle_shop_dag(dag_id, ephemeral_models_as_empty_operator): """Render altered_jaffle_shop (which has a customers -> ephemeral_customers -> ephemeral_customers_downstream lineage) into a DbtDag by invoking ``dbt ls`` on the project, so tests can inspect the rendered tasks via