Skip to content
2 changes: 2 additions & 0 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"ExecutionMode": "cosmos.constants",
"InvocationMode": "cosmos.constants",
"LoadMode": "cosmos.constants",
"SeedRenderingBehavior": "cosmos.constants",
"SourceRenderingBehavior": "cosmos.constants",
"TestBehavior": "cosmos.constants",
"TestIndirectSelection": "cosmos.constants",
Expand Down Expand Up @@ -123,6 +124,7 @@
from cosmos.constants import ExecutionMode as ExecutionMode
from cosmos.constants import InvocationMode as InvocationMode
from cosmos.constants import LoadMode as LoadMode
from cosmos.constants import SeedRenderingBehavior as SeedRenderingBehavior
from cosmos.constants import SourceRenderingBehavior as SourceRenderingBehavior
from cosmos.constants import TestBehavior as TestBehavior
from cosmos.constants import TestIndirectSelection as TestIndirectSelection
Expand Down
27 changes: 27 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
TESTABLE_DBT_RESOURCES,
DbtResourceType,
ExecutionMode,
SeedRenderingBehavior,
SourceRenderingBehavior,
TestBehavior,
TestIndirectSelection,
Expand Down Expand Up @@ -366,6 +367,32 @@ def create_task_metadata( # noqa: C901
# `AIRFLOW__COSMOS__PRE_DBT_FUSION=1`.
models_select_key = "models" if settings.pre_dbt_fusion else "select"

# Apply seed rendering behavior before the TestBehavior.BUILD branch so it is honored
# regardless of the test behavior (under BUILD, seeds would otherwise be rendered as DbtBuild tasks).
if node.resource_type == DbtResourceType.SEED:
if render_config.seed_rendering_behavior == SeedRenderingBehavior.NONE:
# Do not render the seed in the DAG/TaskGroup at all.
return None
if render_config.seed_rendering_behavior == SeedRenderingBehavior.RENDER_ONLY:
# Render the seed as a no-op placeholder so it stays visible in the DAG
# topology/lineage, but never run `dbt seed`.
task_id, args = _get_task_id_and_args(
node=node,
args=args,
use_task_group=use_task_group,
normalize_task_id=render_config.normalize_task_id,
normalize_task_display_name=render_config.normalize_task_display_name,
resource_suffix=node.resource_type.value,
execution_mode=execution_mode,
)
# EmptyOperator does not accept custom dbt parameters (e.g. profile_args); recreate the args.
args = {"task_display_name": args["task_display_name"]} if "task_display_name" in args else {}
return TaskMetadata(id=task_id, operator_class=EMPTY_OPERATOR_CLASS_PATH, arguments=args)
Comment thread
pankajkoti marked this conversation as resolved.
if render_config.seed_rendering_behavior == SeedRenderingBehavior.WHEN_SEED_CHANGES:
# Render the seed as usual, but flag the task so the operator skips running
# `dbt seed` when the CSV content is unchanged since the last successful run.
extra_context["should_run_if_seed_changed"] = True

if node.has_ephemeral_materialization and render_config.ephemeral_models_as_empty_operator:
# Ephemeral models are inlined as CTEs into downstream models and never written to the
# warehouse, so running them via a dbt operator (whether `dbt run` or `dbt build`) is a
Expand Down
45 changes: 45 additions & 0 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import msgpack
import yaml
from airflow.exceptions import AirflowException
from airflow.models import DagRun, Variable

try:
Expand All @@ -20,6 +21,7 @@
from airflow.models.dag import DAG # type: ignore[assignment]
from airflow.utils.session import provide_session
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session

from cosmos import settings
Expand Down Expand Up @@ -56,6 +58,10 @@

logger = get_logger(__name__)
VAR_KEY_CACHE_PREFIX = "cosmos_cache__"
# Seeds rendered with ``SeedRenderingBehavior.WHEN_SEED_CHANGES`` persist the last-run CSV checksum as an
# Airflow Variable, scoped per ``DbtDag``/``DbtTaskGroup`` and seed, so the same seed rendered in different
# DAGs tracks its state independently (a DAG never wrongly skips a seed because a different DAG ran it).
VAR_KEY_SEED_CHECKSUM_PREFIX = "cosmos_seed_checksum__"


def _configure_remote_cache_dir() -> Path | ObjectStoragePath | None:
Expand Down Expand Up @@ -140,6 +146,45 @@ def create_cache_key(cache_identifier: str) -> str:
return f"{VAR_KEY_CACHE_PREFIX}{cache_identifier}"


def _create_seed_checksum_key(dag_task_group_identifier: str, unique_id: str) -> str:
"""Return a bounded, stable Airflow Variable key scoped to a DbtDag/DbtTaskGroup and seed.

Airflow Variable keys are length-bounded (the ``variable.key`` column is commonly 250 chars), so we
store the checksum under a fixed-length key: a readable prefix plus a stable hash of the full
identifier, ensuring long DAG ids / nested task groups / package+resource names never overflow it.
"""
digest = hashlib.sha1(f"{dag_task_group_identifier}:{unique_id}".encode()).hexdigest()
return f"{VAR_KEY_SEED_CHECKSUM_PREFIX}{digest}"
Comment thread
pankajkoti marked this conversation as resolved.


def get_seed_checksum(dag_task_group_identifier: str, unique_id: str) -> str | None:
Comment thread
pankajkoti marked this conversation as resolved.
"""Return the seed's last persisted checksum, or ``None`` when none is stored or it cannot be read.

Best-effort: change detection must never fail a seed task, so a missing value or a transient
metadatabase/Variable backend error is logged and treated as "no stored checksum" (the seed runs).
"""
key = _create_seed_checksum_key(dag_task_group_identifier, unique_id)
try:
checksum: str | None = Variable.get(key, default_var=None)
except (AirflowException, SQLAlchemyError) as exc:
logger.warning("Failed to read seed checksum from Variable `%s`: %s", key, exc)
return None
return checksum


def store_seed_checksum(dag_task_group_identifier: str, unique_id: str, checksum: str) -> None:
"""Persist a seed's checksum after a successful run.

Best-effort: a failure to store the checksum must never fail a seed that already loaded successfully,
so storage errors are logged and swallowed rather than raised.
"""
key = _create_seed_checksum_key(dag_task_group_identifier, unique_id)
try:
Variable.set(key, checksum)
except (AirflowException, SQLAlchemyError) as exc:
logger.warning("Failed to persist seed checksum under Variable `%s`: %s", key, exc)


def _obtain_cache_dir_path(cache_identifier: str, base_dir: Path = settings.cache_dir) -> Path:
"""
Return a directory used to cache a specific Cosmos DbtDag or DbtTaskGroup. If the directory
Expand Down
13 changes: 13 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
ExecutionMode,
InvocationMode,
LoadMode,
SeedRenderingBehavior,
SourceRenderingBehavior,
TestBehavior,
TestIndirectSelection,
Expand Down Expand Up @@ -73,6 +74,7 @@ class RenderConfig:
:param group_nodes_by_folder: When enabled, groups nodes by folder structure, creating a ``TaskGroup`` per resource type and folder. Disabled by default.
:param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6).
:param source_pruning: Determines if source nodes without a corresponding downstream task should be removed or not. Default is False
:param seed_rendering_behavior: Determines how seed nodes are rendered and run (ALWAYS, WHEN_SEED_CHANGES, RENDER_ONLY, NONE). Defaults to "ALWAYS", preserving the original Cosmos behaviour of always rendering and running seeds. WHEN_SEED_CHANGES is only supported for ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV and ExecutionMode.AIRFLOW_ASYNC, and is incompatible with TestBehavior.BUILD. Under ExecutionMode.WATCHER, only ALWAYS is meaningful: a single dbt build runs seeds regardless of this setting.
:param airflow_vars_to_purge_dbt_ls_cache: Specify Airflow variables that will affect the LoadMode.DBT_LS cache.
:param airflow_vars_to_purge_dbt_yaml_selectors_cache: Specify Airflow variables that will affect the parsed manifest YamlSelectors cache.
:param normalize_task_id: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name.
Expand Down Expand Up @@ -101,6 +103,7 @@ class RenderConfig:
group_nodes_by_folder: bool = False
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE
source_pruning: bool = False
seed_rendering_behavior: SeedRenderingBehavior = SeedRenderingBehavior.ALWAYS
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)
airflow_vars_to_purge_dbt_yaml_selectors_cache: list[str] = field(default_factory=list)
normalize_task_id: Callable[..., Any] | None = None
Expand Down Expand Up @@ -128,6 +131,16 @@ def __post_init__(self, dbt_project_path: str | Path | None) -> None:
UserWarning,
)
self.source_rendering_behavior = SourceRenderingBehavior.NONE
if (
self.seed_rendering_behavior == SeedRenderingBehavior.WHEN_SEED_CHANGES
and self.test_behavior == TestBehavior.BUILD
):
raise CosmosValueError(
"SeedRenderingBehavior.WHEN_SEED_CHANGES is incompatible with TestBehavior.BUILD: under "
"TestBehavior.BUILD seeds run via `dbt build` as a single task and cannot be selectively "
"skipped based on CSV changes. Use TestBehavior.AFTER_EACH/AFTER_ALL/NONE, or "
"SeedRenderingBehavior.ALWAYS."
)
self.project_path = Path(dbt_project_path) if dbt_project_path else None
# allows us to initiate this attribute from Path objects and str
self.dbt_ls_path = Path(self.dbt_ls_path) if self.dbt_ls_path else None
Expand Down
19 changes: 19 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,25 @@ class SourceRenderingBehavior(Enum):
WITH_TESTS_OR_FRESHNESS = "with_tests_or_freshness"


class SeedRenderingBehavior(Enum):
"""
Modes to configure how dbt seed nodes are rendered and run.

ALWAYS: Render the seed and run ``dbt seed`` on every execution (default, original Cosmos behaviour).
WHEN_SEED_CHANGES: Render the seed, but only run ``dbt seed`` when the seed's CSV content has changed
since the last successful run. Supported only for execution modes where the Airflow worker can
access the seed files directly (LOCAL, VIRTUALENV, AIRFLOW_ASYNC).
RENDER_ONLY: Render the seed as a no-op ``EmptyOperator`` placeholder so it remains visible in the
DAG topology/lineage, but never run ``dbt seed``.
NONE: Do not render the seed in the DAG/TaskGroup at all.
"""

ALWAYS = "always"
WHEN_SEED_CHANGES = "when_seed_changes"
RENDER_ONLY = "render_only"
NONE = "none"


class DbtResourceType(aenum.Enum): # type: ignore[misc]
"""
Type of dbt node.
Expand Down
14 changes: 14 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
ExecutionMode,
InvocationMode,
LoadMode,
SeedRenderingBehavior,
)
from cosmos.dbt.executable import get_system_dbt, is_dbt_installed_in_same_environment
from cosmos.dbt.graph import DbtGraph
Expand Down Expand Up @@ -130,6 +131,19 @@ def validate_arguments(
if profile_config.profile_mapping:
profile_config.profile_mapping.profile_args["schema"] = task_args["schema"]

# SeedRenderingBehavior.WHEN_SEED_CHANGES decides at task runtime whether to run `dbt seed`, which
# requires Cosmos to run dbt directly on the Airflow worker with filesystem access to the seed files.
if render_config.seed_rendering_behavior == SeedRenderingBehavior.WHEN_SEED_CHANGES and (
execution_config.execution_mode
not in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV, ExecutionMode.AIRFLOW_ASYNC)
):
raise CosmosValueError(
"SeedRenderingBehavior.WHEN_SEED_CHANGES is only supported with ExecutionMode.LOCAL, "
"ExecutionMode.VIRTUALENV or ExecutionMode.AIRFLOW_ASYNC, which run dbt directly on the "
f"Airflow worker. The configured execution_mode is {execution_config.execution_mode}. Use "
"SeedRenderingBehavior.ALWAYS, or switch to a supported execution mode."
)
Comment thread
pankajkoti marked this conversation as resolved.

if execution_config.execution_mode in [ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV]:
profile_config.validate_profiles_yml()
has_non_empty_dependencies = execution_config.project_path and has_non_empty_dependencies_file(
Expand Down
31 changes: 31 additions & 0 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import base64
import datetime
import functools
import hashlib
import itertools
import json
import os
Expand Down Expand Up @@ -69,6 +70,23 @@

logger = get_logger(__name__)

# Read the seed file in fixed-size chunks when checksumming so a large CSV does not have to be loaded
# into memory all at once.
_CHECKSUM_READ_CHUNK_SIZE = 1024 * 1024


def _calculate_file_checksum(file_path: Path) -> str | None:
"""Return the SHA256 checksum of a file, streaming it in chunks, or ``None`` if it cannot be read."""
digest = hashlib.sha256()
try:
with open(file_path, "rb") as file:
for chunk in iter(lambda: file.read(_CHECKSUM_READ_CHUNK_SIZE), b""):
digest.update(chunk)
except OSError as exc:
logger.warning("Unable to read file `%s` to compute its checksum: %s", file_path, exc)
return None
return digest.hexdigest()

Comment thread
pankajkoti marked this conversation as resolved.

def _normalize_path(path: str | None) -> str:
"""
Expand Down Expand Up @@ -113,6 +131,18 @@ def file_path(self) -> Path:
"""Combined path to the node's file (path_base / original_file_path)."""
return self.path_base / self.original_file_path

@property
Comment thread
pankajkoti marked this conversation as resolved.
def checksum(self) -> str | None:
"""SHA256 checksum of a seed's CSV content, used by ``SeedRenderingBehavior.WHEN_SEED_CHANGES``.

Although ``manifest.json`` records a checksum per node, we always recompute it from the seed file
here so the value is consistent regardless of whether the project was loaded via ``LoadMode.MANIFEST``
or ``LoadMode.DBT_LS``. Returns ``None`` for non-seed nodes or when the file cannot be read.
"""
if self.resource_type != DbtResourceType.SEED:
return None
return _calculate_file_checksum(self.file_path)

@property
def has_ephemeral_materialization(self) -> bool:
"""Whether the node is materialized as ephemeral (inlined as a CTE, never written to the warehouse)."""
Expand Down Expand Up @@ -240,6 +270,7 @@ def context_dict(self) -> dict[str, Any]:
"has_non_detached_test": self.has_non_detached_test,
"resource_name": self.resource_name,
"name": self.name,
"checksum": self.checksum,
Comment thread
pankajkoti marked this conversation as resolved.
}


Expand Down
42 changes: 42 additions & 0 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
from cosmos.cache import (
_copy_cached_package_lockfile_to_project,
_get_latest_cached_package_lockfile,
get_seed_checksum,
is_cache_package_lockfile_enabled,
store_seed_checksum,
)
from cosmos.constants import (
_AIRFLOW3_MAJOR_VERSION,
Expand Down Expand Up @@ -1050,6 +1052,46 @@ class DbtSeedLocalOperator(DbtSeedMixin, DbtLocalBaseOperator):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)

def execute(self, context: Context, **kwargs: Any) -> None:
# When SeedRenderingBehavior.WHEN_SEED_CHANGES is active (flagged at render time in
# create_task_metadata), skip running `dbt seed` if the seed's content is unchanged since the
# last successful run. We delegate the actual run to the base execute() so extra_context
# merging, debug-mode memory tracking and **kwargs forwarding are preserved. A `--full-refresh`
# request always runs the seed, bypassing change detection.
extra_context = self.extra_context or {}
if not extra_context.get("should_run_if_seed_changed") or self._is_full_refresh():
super().execute(context, **kwargs)
return

node_config = extra_context.get("dbt_node_config") or {}
dag_task_group_identifier = extra_context.get("dbt_dag_task_group_identifier")
unique_id = node_config.get("unique_id")
current_seed_checksum = node_config.get("checksum")
Comment thread
pankajkoti marked this conversation as resolved.

Comment thread
pankajkoti marked this conversation as resolved.
# Change detection needs the seed's current checksum and a scope (DAG/task-group + seed) to store it
# under. When both are available, skip the run if the checksum matches the last successful run;
# otherwise fall back to always running the seed (change detection is best-effort).
if current_seed_checksum and dag_task_group_identifier and unique_id:
last_run_seed_checksum = get_seed_checksum(dag_task_group_identifier, unique_id)
should_run = last_run_seed_checksum != current_seed_checksum
if not should_run:
# Return successfully (do NOT raise AirflowSkipException) so downstream models that depend
# on the seed are not skip-propagated by the default trigger rule.
self.log.info("Seed `%s` is unchanged since its last successful run; skipping `dbt seed`.", unique_id)
return
super().execute(context, **kwargs)
store_seed_checksum(dag_task_group_identifier, unique_id, current_seed_checksum)
return

super().execute(context, **kwargs)

def _is_full_refresh(self) -> bool:
"""Return whether --full-refresh is requested, mirroring DbtSeedMixin.add_cmd_flags handling."""
if isinstance(self.full_refresh, str):
# `to_boolean` does not strip whitespace, so a rendered " true " would otherwise be False.
return bool(to_boolean(self.full_refresh.strip()))
return bool(self.full_refresh)
Comment thread
pankajkoti marked this conversation as resolved.


class DbtSnapshotLocalOperator(DbtSnapshotMixin, DbtLocalBaseOperator):
"""
Expand Down
Loading
Loading