Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
424a9a1
Fix failing integration tests due to AF Object Storage version support
tatiana Jan 29, 2026
0fbf7e1
Change manifest with selector path to use manifest_with_selector.json…
tatiana Jan 29, 2026
9b2c4f2
Restore manifest.json before selector support
tatiana Jan 29, 2026
087da1c
Move selectors and manifest to the altered_jaffle_shop folder
tatiana Jan 29, 2026
15c927a
Fix unittests related to the hash of the jaffle_shop project in MacOS
tatiana Jan 29, 2026
134bf50
Change test_save_yaml_selectors_cache to use project that defines sel…
tatiana Jan 29, 2026
0e3a28e
Remove default selector - previously it would not display any nodes b…
tatiana Jan 29, 2026
e4fa35d
Fix hash for test_save_yaml_selectors_cache in linux
tatiana Jan 29, 2026
0ce9954
Support cross-referencing models across dbt projects using dbt-loom (…
pankajkoti Jan 29, 2026
dec5b68
Fix issue in running example DAG dev/dags/cosmos_manifest_selectors_e…
tatiana Jan 29, 2026
fd017e4
Fix last broken tests
tatiana Jan 29, 2026
cab688f
Comment DAGs that should not be run in AF 3
tatiana Jan 29, 2026
576d4ff
Add integration tests using `InvocationMode.SUBPROCESS` and validate …
tatiana Jan 29, 2026
6fabe60
Update watcher docs (#2298)
tatiana Jan 29, 2026
08cbc7f
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
tatiana Jan 29, 2026
485e0a4
Update docs/configuration/caching.rst
tatiana Jan 29, 2026
f0d87a3
Support use of YAML selectors when using LoadMode.DBT_MANIFEST (#2261)
YourRoyalLinus Jan 29, 2026
66f1b32
Merge branch 'main' into 2257-yaml-selector-support-with-manifest-loa…
tatiana Jan 29, 2026
c8b8a93
Re-apply the changes before the rebase
tatiana Jan 29, 2026
2dfd1d4
Fix cache keys after rebase
tatiana Jan 29, 2026
274e8ea
Fix cache keys after rebase
tatiana Jan 29, 2026
e30828a
Remove default selector - as I had done before the rebase
tatiana Jan 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ repos:
rev: v6.0.0
hooks:
- id: check-added-large-files
exclude: ^dev/dags/dbt/cross_project/downstream/target/manifest.json$
- id: check-merge-conflict
- id: check-toml
- id: check-yaml
Expand Down
30 changes: 26 additions & 4 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,26 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
project_path.parent / node_dict["package_name"] if node_dict.get("package_name") else project_path # type: ignore
)

# dbt-core defined the node path via "original_file_path", dbt fusion identifies it via "path"
# External nodes (e.g., from dbt-loom) may not have a file path - skip them
# dbt-loom injects upstream models with resource_type="model" and empty file path
# Check for both None and empty string since dbt-loom may set either
node_file_path = node_dict.get("original_file_path") or node_dict.get("path")
resource_type = node_dict.get("resource_type")
if not node_file_path and resource_type == "model" and node_dict.get("unique_id"):
logger.debug(
"Skipping model `%s` because it has no file path (likely an external reference from dbt-loom or similar)",
node_dict.get("unique_id"),
)
continue

try:
node = DbtNode(
unique_id=node_dict["unique_id"],
package_name=node_dict.get("package_name"),
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
# dbt-core defined the node path via "original_file_path", dbt fusion identifies it via "path"
file_path=base_path / (node_dict["original_file_path"] or node_dict.get("path")),
file_path=base_path / node_file_path, # type: ignore[arg-type]
tags=node_dict.get("tags") or [],
config=node_dict.get("config") or {},
has_freshness=(
Expand All @@ -331,7 +343,7 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
else False
),
)
except KeyError:
except (KeyError, TypeError):
logger.info("Could not parse following the dbt ls line even though it was a valid JSON `%s`", line)
else:
nodes[node.unique_id] = node
Expand Down Expand Up @@ -1119,12 +1131,22 @@ def load_from_dbt_manifest(self) -> None:

resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})}
for unique_id, node_dict in resources.items():
# External nodes (e.g., from dbt-loom) may not have a file path - skip them
# Check for both None and empty string since dbt-loom may set either
original_file_path = node_dict.get("original_file_path")
if not original_file_path:
logger.debug(
"Skipping node `%s` because it has no file path (likely an external reference from dbt-loom or similar)",
unique_id,
)
continue

node = DbtNode(
unique_id=unique_id,
package_name=node_dict.get("package_name"),
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=self.execution_config.project_path / _normalize_path(node_dict["original_file_path"]),
file_path=self.execution_config.project_path / _normalize_path(original_file_path),
tags=node_dict.get("tags") or [],
config=node_dict.get("config") or {},
has_freshness=(
Expand Down
18 changes: 11 additions & 7 deletions cosmos/hooks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
except ImportError:
from airflow.hooks.base import BaseHook

from cosmos.log import get_logger

logger = get_logger(__name__)


class FullOutputSubprocessResult(NamedTuple):
exit_code: int
Expand Down Expand Up @@ -60,7 +64,7 @@ def run_command(
``output``: the last line from stderr or stdout
``full_output``: all lines from stderr or stdout.
"""
self.log.info("Tmp dir root location: \n %s", gettempdir())
logger.info("Tmp dir root location: \n %s", gettempdir())
log_lines = []
with contextlib.ExitStack() as stack:
if cwd is None:
Expand All @@ -73,7 +77,7 @@ def pre_exec() -> None:
signal.signal(getattr(signal, sig), signal.SIG_DFL)
os.setsid()

self.log.info("Running command: %s", command)
logger.info("Running command: %s", command)

self.sub_process = Popen(
command,
Expand All @@ -91,7 +95,7 @@ def pre_exec() -> None:
if self.sub_process is None:
raise RuntimeError("The subprocess should be created here and is None!")

self.log.info("Command output:")
logger.info("Command output:")

last_line: str = ""
assert self.sub_process.stdout is not None
Expand All @@ -102,23 +106,23 @@ def pre_exec() -> None:
if process_log_line:
process_log_line(line, kwargs)
else:
self.log.info("%s", line)
logger.info("%s", line)

# Wait until process completes
return_code = self.sub_process.wait()

self.log.info("Command exited with return code %s", return_code)
logger.info("Command exited with return code %s", return_code)

return FullOutputSubprocessResult(exit_code=return_code, output=last_line, full_output=log_lines)

def send_sigterm(self) -> None:
"""Sends SIGTERM signal to ``self.sub_process`` if one exists."""
self.log.info("Sending SIGTERM signal to process group")
logger.info("Sending SIGTERM signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)

def send_sigint(self) -> None:
"""Sends SIGINT signal to ``self.sub_process`` if one exists."""
self.log.info("Sending SIGINT signal to process group")
logger.info("Sending SIGINT signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGINT)
26 changes: 15 additions & 11 deletions cosmos/operators/_watcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,21 @@ def store_dbt_resource_status_from_log(line: str, extra_kwargs: Any) -> None:
except json.JSONDecodeError:
logger.debug("Failed to parse log: %s", line)
log_line = {}
node_info = log_line.get("data", {}).get("node_info", {})
node_status = node_info.get("node_status")
unique_id = node_info.get("unique_id")

logger.debug("Model: %s is in %s state", unique_id, node_status)

# TODO: Handle and store all possible node statuses, not just the current success and failed
if node_status in ["success", "failed"]:
context = extra_kwargs.get("context")
assert context is not None # Make MyPy happy
safe_xcom_push(task_instance=context["ti"], key=f"{unique_id.replace('.', '__')}_status", value=node_status)
else:
logger.debug("Log line: %s", log_line)
if "info" in log_line and "msg" in log_line["info"]:
logger.info(log_line["info"]["msg"])
node_info = log_line.get("data", {}).get("node_info", {})
node_status = node_info.get("node_status")
unique_id = node_info.get("unique_id")

logger.debug("Model: %s is in %s state", unique_id, node_status)

# TODO: Handle and store all possible node statuses, not just the current success and failed
if node_status in ["success", "failed"]:
context = extra_kwargs.get("context")
assert context is not None # Make MyPy happy
safe_xcom_push(task_instance=context["ti"], key=f"{unique_id.replace('.', '__')}_status", value=node_status)

# Additionally, log the message from dbt logs
log_info = log_line.get("info", {})
Expand Down
3 changes: 0 additions & 3 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,6 @@ def run_subprocess(
process_log_line=self._process_log_line_callable,
**kwargs,
)
# Logging changed in Airflow 3.1 and we needed to replace the output by the full output:
output = "".join(subprocess_result.full_output)
logger.info(output)
return subprocess_result

def run_dbt_runner(self, command: list[str], env: dict[str, str], cwd: str, **kwargs: Any) -> dbtRunnerResult:
Expand Down
23 changes: 18 additions & 5 deletions cosmos/operators/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import base64
import json
import zlib
from collections.abc import Sequence
from collections.abc import Callable, Sequence
from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -83,7 +83,7 @@ class DbtProducerWatcherOperator(DbtBuildMixin, DbtLocalBaseOperator):
template_fields = DbtLocalBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator]
# Use staticmethod to prevent Python's descriptor protocol from binding the function to `self`
# when accessed via instance, which would incorrectly pass `self` as the first argument
_process_log_line_callable = staticmethod(store_dbt_resource_status_from_log)
_process_log_line_callable: Callable[[str, Any], None] | None = None

def __init__(self, *args: Any, **kwargs: Any) -> None:
task_id = kwargs.pop("task_id", PRODUCER_WATCHER_TASK_ID)
Expand Down Expand Up @@ -148,7 +148,23 @@ def _finalize(self, context: Context, startup_events: list[dict[str, Any]]) -> N
if startup_events:
safe_xcom_push(task_instance=context["ti"], key="dbt_startup_events", value=startup_events)

def _set_invocation_mode_if_not_set(self) -> None:
if not self.invocation_mode:
logger.info("No invocation mode provided, discovering it")
self._discover_invocation_mode()

def _set_process_log_line_callable_if_subprocess(self) -> None:
if self.invocation_mode == InvocationMode.SUBPROCESS:
logger.info(
"DbtProducerWatcherOperator: Setting log_format to json and process_log_line_callable to store_dbt_resource_status_from_log"
)
self.log_format = "json"
self._process_log_line_callable = store_dbt_resource_status_from_log

def execute(self, context: Context, **kwargs: Any) -> Any:
self._set_invocation_mode_if_not_set()
self._set_process_log_line_callable_if_subprocess()

task_instance = context.get("ti")
if task_instance is None:
raise AirflowException("DbtProducerWatcherOperator expects a task instance in the execution context")
Expand All @@ -169,9 +185,6 @@ def execute(self, context: Context, **kwargs: Any) -> Any:
)

try:
if not self.invocation_mode:
self._discover_invocation_mode()

use_events = self.invocation_mode == InvocationMode.DBT_RUNNER and EventMsg is not None
logger.debug("DbtProducerWatcherOperator: use_events=%s", use_events)

Expand Down
30 changes: 24 additions & 6 deletions dev/dags/cosmos_manifest_selectors_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

execution_config = ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "jaffle_shop")
execution_config = ExecutionConfig(dbt_project_path=DBT_ROOT_PATH / "altered_jaffle_shop")

profile_config = ProfileConfig(
profile_name="default",
Expand Down Expand Up @@ -47,11 +47,29 @@
):
pre_dbt = EmptyOperator(task_id="pre_dbt")

# The selector `critical_path` only selects the `customers` model, which means we were lacking its upstream tasks.
# Without `stg_customers`, dbt will not be able to run the `customers` model
pre_condition = DbtTaskGroup(
group_id="pre_condition",
project_config=ProjectConfig(
manifest_path=DBT_ROOT_PATH / "altered_jaffle_shop" / "target" / "manifest.json",
project_name="jaffle_shop",
),
profile_config=profile_config,
render_config=RenderConfig(
load_method=LoadMode.DBT_MANIFEST,
select=["+customers"],
airflow_vars_to_purge_dbt_yaml_selectors_cache=["purge"],
),
execution_config=execution_config,
operator_args={"install_deps": True},
)

# [START local_example]
local_example = DbtTaskGroup(
group_id="local_example",
project_config=ProjectConfig(
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
manifest_path=DBT_ROOT_PATH / "altered_jaffle_shop" / "target" / "manifest.json",
project_name="jaffle_shop",
),
profile_config=profile_config,
Expand All @@ -65,7 +83,7 @@
aws_s3_example = DbtTaskGroup(
group_id="aws_s3_example",
project_config=ProjectConfig(
manifest_path="s3://cosmos-manifest-test/manifest.json",
manifest_path="s3://cosmos-manifest-test/manifest_with_selector.json",
manifest_conn_id="aws_s3_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `aws_default` is used.
project_name="jaffle_shop",
Expand All @@ -81,7 +99,7 @@
gcp_gs_example = DbtTaskGroup(
group_id="gcp_gs_example",
project_config=ProjectConfig(
manifest_path="gs://cosmos_remote_target/manifest.json",
manifest_path="gs://cosmos_remote_target/manifest_with_selector.json",
manifest_conn_id="gcp_gs_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `google_cloud_default` is used.
project_name="jaffle_shop",
Expand All @@ -97,7 +115,7 @@
azure_abfs_example = DbtTaskGroup(
group_id="azure_abfs_example",
project_config=ProjectConfig(
manifest_path="abfs://cosmos-manifest-test/manifest.json",
manifest_path="abfs://cosmos-manifest-test/manifest_with_selector.json",
manifest_conn_id="azure_abfs_conn",
# `manifest_conn_id` is optional. If not provided, the default connection ID `wasb_default` is used.
project_name="jaffle_shop",
Expand All @@ -111,4 +129,4 @@

post_dbt = EmptyOperator(task_id="post_dbt")

(pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt)
(pre_dbt >> pre_condition >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt)
Loading