diff --git a/cosmos/dbt/runner.py b/cosmos/dbt/runner.py index 6104ab47d0..5324e41d76 100644 --- a/cosmos/dbt/runner.py +++ b/cosmos/dbt/runner.py @@ -2,7 +2,7 @@ import sys from functools import lru_cache -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Callable from cosmos.dbt.project import change_working_directory, environ from cosmos.exceptions import CosmosDbtRunError @@ -39,7 +39,7 @@ def is_available() -> bool: @cache -def get_runner() -> dbtRunner: +def _get_cached_dbt_runner() -> dbtRunner: """ Retrieves a dbtRunner instance. """ @@ -48,7 +48,21 @@ def get_runner() -> dbtRunner: return dbtRunner() -def run_command(command: list[str], env: dict[str, str], cwd: str) -> dbtRunnerResult: +def get_runner(callbacks: list[Callable] | None = None) -> dbtRunner: # type: ignore[type-arg] + """ + Retrieves a dbtRunner instance. + """ + if callbacks and isinstance(callbacks, list): + from dbt.cli.main import dbtRunner + + return dbtRunner(callbacks=callbacks) + + return _get_cached_dbt_runner() + + +def run_command( + command: list[str], env: dict[str, str], cwd: str, callbacks: list[Callable] | None = None # type: ignore[type-arg] +) -> dbtRunnerResult: """ Invokes the dbt command programmatically. """ @@ -58,7 +72,7 @@ def run_command(command: list[str], env: dict[str, str], cwd: str) -> dbtRunnerR cli_args = command[1:] with change_working_directory(cwd), environ(env): logger.info("Trying to run dbtRunner with:\n %s\n in %s", cli_args, cwd) - runner = get_runner() + runner = get_runner(callbacks=callbacks) result = runner.invoke(cli_args) return result diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 34efad55ff..a68cea8ab8 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -186,6 +186,7 @@ def __init__( should_store_compiled_sql: bool = True, should_upload_compiled_sql: bool = False, append_env: bool = True, + dbt_runner_callbacks: list[Callable] | None = None, # type: ignore[type-arg] **kwargs: Any, ) -> None: self.task_id = task_id @@ -199,6 +200,7 @@ def __init__( self.openlineage_events_completes: list[RunEvent] = [] self.invocation_mode = invocation_mode self._dbt_runner: dbtRunner | None = None + self._dbt_runner_callbacks = dbt_runner_callbacks super().__init__(task_id=task_id, **kwargs) @@ -470,7 +472,7 @@ def run_dbt_runner(self, command: list[str], env: dict[str, str], cwd: str) -> d "Could not import dbt core. Ensure that dbt-core >= v1.5 is installed and available in the environment where the operator is running." ) - return dbt_runner.run_command(command, env, cwd) + return dbt_runner.run_command(command, env, cwd, callbacks=self._dbt_runner_callbacks) def _cache_package_lockfile(self, tmp_project_dir: Path) -> None: project_dir = Path(self.project_dir) @@ -577,13 +579,36 @@ def _update_partial_parse_cache(self, tmp_dir_path: Path) -> None: if partial_parse_file.exists(): cache._update_partial_parse_cache(partial_parse_file, self.cache_dir) - def _handle_post_execution(self, tmp_project_dir: str, context: Context) -> None: + def _push_run_results_to_xcom(self, tmp_project_dir: str, context: Context) -> None: + run_results_path = Path(tmp_project_dir) / "target" / "run_results.json" + if not run_results_path.is_file(): + raise AirflowException(f"run_results.json not found at {run_results_path}") + + try: + with run_results_path.open() as fp: + raw = json.load(fp) + except json.JSONDecodeError as exc: + raise AirflowException("Invalid JSON in run_results.json") from exc + self.log.debug("Loaded run results from %s", run_results_path) + + compressed = base64.b64encode(zlib.compress(json.dumps(raw).encode())).decode() + context["ti"].xcom_push(key="run_results", value=compressed) + + self.log.info("Pushed run results to XCom") + + def _handle_post_execution( + self, tmp_project_dir: str, context: Context, push_run_results_to_xcom: bool = False + ) -> None: self.store_freshness_json(tmp_project_dir, context) self.store_compiled_sql(tmp_project_dir, context) self._override_rtif(context) if self.should_upload_compiled_sql: self._upload_sql_files(tmp_project_dir, "compiled") + + if push_run_results_to_xcom: + self._push_run_results_to_xcom(tmp_project_dir, context) + if self.callback: self.callback_args.update({"context": context}) if isinstance(self.callback, list): @@ -614,6 +639,7 @@ def run_command( # noqa: C901 context: Context, run_as_async: bool = False, async_context: dict[str, Any] | None = None, + push_run_results_to_xcom: bool = False, ) -> FullOutputSubprocessResult | dbtRunnerResult | str: """ Copies the dbt project to a temporary directory and runs the command. @@ -667,7 +693,7 @@ def run_command( # noqa: C901 if self.partial_parse: self._update_partial_parse_cache(tmp_dir_path) - self._handle_post_execution(tmp_project_dir, context) + self._handle_post_execution(tmp_project_dir, context, push_run_results_to_xcom) self.handle_exception(result) if run_as_async and async_context: @@ -861,6 +887,7 @@ def build_and_run_cmd( cmd_flags: list[str] | None = None, run_as_async: bool = False, async_context: dict[str, Any] | None = None, + **kwargs: Any, ) -> FullOutputSubprocessResult | dbtRunnerResult: # If this is an async run and we're using the setup task, make sure to include the full_refresh flag if set if run_as_async and settings.enable_setup_async_task and getattr(self, "full_refresh", False): @@ -872,7 +899,7 @@ def build_and_run_cmd( dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags) dbt_cmd = dbt_cmd or [] result = self.run_command( - cmd=dbt_cmd, env=env, context=context, run_as_async=run_as_async, async_context=async_context + cmd=dbt_cmd, env=env, context=context, run_as_async=run_as_async, async_context=async_context, **kwargs ) return result @@ -974,7 +1001,7 @@ def _handle_warnings(self, result: FullOutputSubprocessResult | dbtRunnerResult, self.on_warning_callback and self.on_warning_callback(warning_context) def execute(self, context: Context, **kwargs: Any) -> None: - result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) + result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags(), **kwargs) if self.on_warning_callback: self._handle_warnings(result, context) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index f33bd4185f..ec5c78acb3 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -107,6 +107,7 @@ def run_command( context: Context, run_as_async: bool = False, async_context: dict[str, Any] | None = None, + push_run_results_to_xcom: bool = False, ) -> FullOutputSubprocessResult | dbtRunnerResult: # No virtualenv_dir set, so create a temporary virtualenv if self.virtualenv_dir is None or self.is_virtualenv_dir_temporary: @@ -114,7 +115,14 @@ def run_command( with TemporaryDirectory(prefix="cosmos-venv") as tempdir: self.virtualenv_dir = Path(tempdir) self._py_bin = self._prepare_virtualenv() - return super().run_command(cmd, env, context, run_as_async=run_as_async, async_context=async_context) + return super().run_command( + cmd, + env, + context, + run_as_async=run_as_async, + async_context=async_context, + push_run_results_to_xcom=push_run_results_to_xcom, + ) try: self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists") @@ -126,7 +134,14 @@ def run_command( self.log.info("Acquiring the virtualenv lock") self._acquire_venv_lock() self._py_bin = self._prepare_virtualenv() - return super().run_command(cmd, env, context, run_as_async=run_as_async, async_context=async_context) + return super().run_command( + cmd, + env, + context, + run_as_async=run_as_async, + async_context=async_context, + push_run_results_to_xcom=push_run_results_to_xcom, + ) finally: self.log.info("Releasing virtualenv lock") self._release_venv_lock() diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index cffd7e6ee3..1a6e0443a4 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -1,4 +1,30 @@ -status_model_fhir_dbt_analytics_active_encounters_daily_nodefinished = { +from __future__ import annotations + +import base64 +import json +import logging +import zlib +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: # pragma: no cover + try: + from airflow.sdk.definitions.context import Context + except ImportError: + from airflow.utils.context import Context # type: ignore[attr-defined] + +from cosmos.constants import InvocationMode +from cosmos.operators.local import DbtLocalBaseOperator + +try: + from dbt_common.events.base_types import EventMsg +except ImportError: # pragma: no cover + EventMsg = None + +logger = logging.getLogger(__name__) + + +# Example dbt event JSON dictionaries (kept for reference) +nodefinished_model__fhir_dbt_utils__fhir_table_list = { "info": { "name": "NodeFinished", "code": "Q025", @@ -90,3 +116,94 @@ "data": {"adapter_name": "bigquery", "adapter_version": "=1.9.0"}, } } + + +class DbtProducerWatcherOperator(DbtLocalBaseOperator): + """Run dbt build and update XCom with the progress of each model, as part of the *WATCHER* execution mode. + + Executes **one** ``dbt build`` covering the whole selection. + + - **When ``InvocationMode.DBT_RUNNER`` is set** we patch + ``dbtRunner`` so we receive structured events *while* dbt is running. In + this real-time mode the operator: + – pushes startup metadata events (``MainReportVersion``, + ``AdapterRegistered``) together under XCom key + ``dbt_startup_events``; + – pushes each ``NodeFinished`` event immediately to XCom under + ``nodefinished_`` (zlib zipped+base64 JSON) so downstream + sensors can react with near-zero latency. + + - **When ``dbtRunner`` is *not* available** (older dbt or + ``InvocationMode=SUBPROCESS``) we fallback to delayed strategy: after + dbt exits we read ``target/run_results.json`` and push the whole mapping + once under key ``run_results`` to XCom. Sensors can poll this key but will not + get per-model updates until the build completes - by the end of the execution of all dbt nodes. + + This keeps the heavy dbt work centralised while providing near real-time + feedback and granular task-level observability downstream. + """ + + base_cmd = ["build"] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + task_id = kwargs.pop("task_id", "dbt_producer_watcher_operator") + super().__init__(task_id=task_id, *args, **kwargs) + + @staticmethod + def _serialize_event(ev: EventMsg) -> dict[str, Any]: + """Convert structured dbt EventMsg to plain dict.""" + from google.protobuf.json_format import MessageToDict + + return MessageToDict(ev, preserving_proto_field_name=True) # type: ignore[no-any-return] + + def _handle_startup_event(self, ev: EventMsg, startup_events: list[dict[str, Any]]) -> None: + info = ev.info # type: ignore[attr-defined] + raw_ts = getattr(info, "ts", None) + ts_val = raw_ts.ToJsonString() if hasattr(raw_ts, "ToJsonString") else str(raw_ts) # type: ignore[union-attr] + startup_events.append({"name": info.name, "msg": info.msg, "ts": ts_val}) + + def _handle_node_finished( + self, + ev: EventMsg, + context: Context, + ) -> None: + self.log.debug("DbtProducerWatcherOperator: handling node finished event: %s", ev) + ti = context["ti"] + uid = ev.data.node_info.unique_id + ev_dict = self._serialize_event(ev) + payload = base64.b64encode(zlib.compress(json.dumps(ev_dict).encode())).decode() + ti.xcom_push(key=f"nodefinished_{uid.replace('.', '__')}", value=payload) + + def _finalize(self, context: Context, startup_events: list[dict[str, Any]]) -> None: + ti = context["ti"] + # Only push startup events; per-model statuses are available via individual nodefinished_ entries. + if startup_events: + ti.xcom_push(key="dbt_startup_events", value=startup_events) + + def execute(self, context: Context, **kwargs: Any) -> Any: + if not self.invocation_mode: + self._discover_invocation_mode() + + use_events = self.invocation_mode == InvocationMode.DBT_RUNNER and EventMsg is not None + self.log.debug("DbtProducerWatcherOperator: use_events=%s", use_events) + + startup_events: list[dict[str, Any]] = [] + + if use_events: + + def _callback(ev: EventMsg) -> None: + name = ev.info.name + if name in {"MainReportVersion", "AdapterRegistered"}: + self._handle_startup_event(ev, startup_events) + elif name == "NodeFinished": + self._handle_node_finished(ev, context) + + self._dbt_runner_callbacks = [_callback] + result = super().execute(context=context, **kwargs) + + self._finalize(context, startup_events) + return result + + # Fallback – push run_results.json via base class helper + kwargs["push_run_results_to_xcom"] = True + return super().execute(context=context, **kwargs) diff --git a/tests/dbt/test_runner.py b/tests/dbt/test_runner.py index 18ef5a2603..8739d844b5 100644 --- a/tests/dbt/test_runner.py +++ b/tests/dbt/test_runner.py @@ -5,6 +5,13 @@ from pathlib import Path from unittest.mock import patch +from airflow import DAG +from pendulum import datetime + +from cosmos.config import InvocationMode, ProfileConfig +from cosmos.operators.local import DbtRunLocalOperator +from cosmos.operators.watcher import DbtProducerWatcherOperator + sys.modules.pop("dbt.cli.main", None) import pytest @@ -108,3 +115,104 @@ def test_handle_exception_if_needed_after_error(invalid_dbt_project_dir): err_msg = str(exc_info.value) expected1 = "dbt invocation completed with errors:" assert expected1 in err_msg + + +@pytest.mark.integration +def test_dbt_runner_caching_and_callbacks(valid_dbt_project_dir): + """Test that: + 1. DbtRunLocalOperator uses cached runner (no callbacks) + 2. DbtProducerWatcherOperator creates new runner with callbacks + """ + # Track dbtRunner instances + instances = [] + + class _MockTI: + """Mock TaskInstance with required attributes.""" + + def __init__(self): + self.openlineage_events_completes = [] + self.store = {} + + def xcom_push(self, key, value, **_): + self.store[key] = value + + class _FakeResult: + """Mock dbtRunnerResult.""" + + def __init__(self): + self.success = True + self.result = None + + class _FakeRunner: + """Mock dbtRunner that tracks instances.""" + + def __init__(self, callbacks=None): + self.callbacks = callbacks or [] + instances.append(self) + + def invoke(self, *args): + return _FakeResult() + + # Create mock context with task_instance + mock_ti = _MockTI() + mock_context = { + "ti": mock_ti, + "task_instance": mock_ti, + "run_id": "test_run", + } + + mock_profile = ProfileConfig( + profile_name="test", target_name="test", profiles_yml_filepath=str(valid_dbt_project_dir / "profiles.yml") + ) + + with DAG( + "test_dag", + start_date=datetime(2025, 1, 1), + schedule=None, + ) as dag: + with patch.dict( + sys.modules, + { + "dbt": type("dbt", (), {}), + "dbt.cli": type("dbt.cli", (), {}), + "dbt.cli.main": type("dbt.cli.main", (), {"dbtRunner": _FakeRunner}), + "dbt.version": type("dbt.version", (), {"__version__": "1.9.0"}), + }, + ), patch( + "cosmos.operators.local.DbtLocalBaseOperator.build_cmd", + return_value=(["dbt", "run"], {}), + ), patch( + "cosmos.operators.local.AbstractDbtLocalBase._handle_post_execution" + ): + # First operator - DbtRunLocalOperator should use cached runner + op1 = DbtRunLocalOperator( + task_id="dbt_run", + project_dir=str(valid_dbt_project_dir), + profile_config=mock_profile, + install_deps=False, + ) + mock_context["dag"] = dag + op1.execute(context=mock_context) + + # Second operator - DbtProducerWatcherOperator should create new runner with callback + op2 = DbtProducerWatcherOperator( + task_id="dbt_watch", + project_dir=str(valid_dbt_project_dir), + profile_config=mock_profile, + install_deps=False, + ) + op2.invocation_mode = InvocationMode.DBT_RUNNER + + class _DummyEv: + pass + + with patch("cosmos.operators.watcher.EventMsg", _DummyEv): + op2.execute(context=mock_context) + + # Verify: + # 1. We have two dbt Runner instances (cached + new with callbacks) + assert len(instances) == 2 + # 2. First instance (cached) has no callbacks + assert not instances[0].callbacks + # 3. Second instance has one callback + assert len(instances[1].callbacks) == 1 diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 22a7adbcf0..506551bb17 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -2108,6 +2108,71 @@ def test_dbt_cmd_flags_mixed_static_and_templated(): assert operator.dbt_cmd_flags == expected_flags +def test_push_run_results_to_xcom_missing_file(): + """Test that _push_run_results_to_xcom raises AirflowException when run_results.json doesn't exist.""" + from airflow.exceptions import AirflowException + + operator = DbtRunLocalOperator( + task_id="test", + project_dir="/tmp", + profile_config=profile_config, + ) + mock_ti = MagicMock() + mock_context = {"ti": mock_ti} + + with pytest.raises(AirflowException) as exc_info: + operator._push_run_results_to_xcom("/tmp", mock_context) + assert "run_results.json not found" in str(exc_info.value) + + +def test_push_run_results_to_xcom_invalid_json(tmp_path): + """Test that _push_run_results_to_xcom raises AirflowException when run_results.json is invalid JSON.""" + from airflow.exceptions import AirflowException + + target_dir = tmp_path / "target" + target_dir.mkdir() + run_results_path = target_dir / "run_results.json" + run_results_path.write_text("invalid json{") + + operator = DbtRunLocalOperator( + task_id="test", + project_dir=str(tmp_path), + profile_config=profile_config, + ) + mock_ti = MagicMock() + mock_context = {"ti": mock_ti} + + with pytest.raises(AirflowException) as exc_info: + operator._push_run_results_to_xcom(str(tmp_path), mock_context) + assert "Invalid JSON in run_results.json" in str(exc_info.value) + + +def test_push_run_results_to_xcom_success(tmp_path): + """Test that _push_run_results_to_xcom successfully pushes valid JSON to XCom.""" + target_dir = tmp_path / "target" + target_dir.mkdir() + run_results_path = target_dir / "run_results.json" + test_data = {"results": [{"status": "success"}]} + run_results_path.write_text(json.dumps(test_data)) + + operator = DbtRunLocalOperator( + task_id="test", + project_dir=str(tmp_path), + profile_config=profile_config, + ) + mock_ti = MagicMock() + mock_context = {"ti": mock_ti} + + operator._push_run_results_to_xcom(str(tmp_path), mock_context) + + mock_ti.xcom_push.assert_called_once() + key, value = mock_ti.xcom_push.call_args[1]["key"], mock_ti.xcom_push.call_args[1]["value"] + assert key == "run_results" + + decompressed = json.loads(zlib.decompress(base64.b64decode(value)).decode()) + assert decompressed == test_data + + def test_dbt_cmd_flags_all_templated(): """Test that dbt_cmd_flags works when all values are templated.""" from datetime import datetime diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py new file mode 100644 index 0000000000..93d05e35e1 --- /dev/null +++ b/tests/operators/test_watcher.py @@ -0,0 +1,168 @@ +import base64 +import json +import zlib +from types import SimpleNamespace +from unittest.mock import patch + +from cosmos.config import InvocationMode +from cosmos.operators.watcher import DbtProducerWatcherOperator + + +class _MockTI: + def __init__(self) -> None: + self.store: dict[str, str] = {} + + def xcom_push(self, key: str, value: str, **_): + self.store[key] = value + + +class _MockContext(dict): + pass + + +def _fake_event(name: str = "NodeFinished", uid: str = "model.pkg.m"): + """Create a minimal fake EventMsg-like object suitable for helper tests.""" + + class _Info(SimpleNamespace): + pass + + class _NodeInfo(SimpleNamespace): + pass + + class _RunResult(SimpleNamespace): + pass + + node_info = _NodeInfo(unique_id=uid) + run_result = _RunResult(status="success", message="ok") + + data = SimpleNamespace(node_info=node_info, run_result=run_result) + info = _Info(name=name, code="X", msg="msg") + return SimpleNamespace(info=info, data=data) + + +@patch("google.protobuf.json_format.MessageToDict") +def test_serialize_event(mock_mtd): + op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + + mock_mtd.side_effect = lambda ev, **kwargs: {"dummy": True} + + out = op._serialize_event(_fake_event()) + assert out == {"dummy": True} + mock_mtd.assert_called() + + +def test_handle_startup_event(): + op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + lst: list[dict] = [] + ev = _fake_event("MainReportVersion") + op._handle_startup_event(ev, lst) + assert lst and lst[0]["name"] == "MainReportVersion" + + +def test_handle_node_finished_pushes_xcom(): + op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + ti = _MockTI() + ctx = _MockContext(ti=ti) + + with patch.object(op, "_serialize_event", return_value={"foo": "bar"}): + ev = _fake_event() + op._handle_node_finished(ev, ctx) + + stored = list(ti.store.values())[0] + raw = zlib.decompress(base64.b64decode(stored)).decode() + assert json.loads(raw) == {"foo": "bar"} + + +def test_execute_streaming_mode(): + """Streaming path should push startup + per-model XComs.""" + from contextlib import nullcontext + + op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + op.invocation_mode = InvocationMode.DBT_RUNNER + + import cosmos.operators.watcher as _watch_mod + + # Ensure EventMsg symbol exists without permanently altering the module + if _watch_mod.EventMsg is None: + + class _DummyEv: + pass + + eventmsg_patch = patch("cosmos.operators.watcher.EventMsg", _DummyEv, create=True) + else: + eventmsg_patch = nullcontext() # type: ignore + + ti = _MockTI() + ctx = {"ti": ti, "run_id": "dummy"} + + main_rep = _fake_event("MainReportVersion") + node_evt = _fake_event("NodeFinished", uid="model.pkg.x") + + def fake_base_execute(self, context=None, **_): # type: ignore[override] + for cb in getattr(self, "_dbt_runner_callbacks", []): + cb(main_rep) + cb(node_evt) + return None + + with eventmsg_patch, patch.object( + DbtProducerWatcherOperator, + "_serialize_event", + lambda self, ev: {"dummy": True}, + ), patch( + "cosmos.operators.watcher.DbtLocalBaseOperator.execute", + fake_base_execute, + ): + op.execute(context=ctx) + + assert "dbt_startup_events" in ti.store + + node_key = "nodefinished_model__pkg__x" + assert node_key in ti.store + + +def test_execute_fallback_mode(tmp_path): + """Fallback path pushes compressed run_results once.""" + + tgt = tmp_path / "target" + tgt.mkdir() + with (tgt / "run_results.json").open("w") as fp: + json.dump({"results": [{"unique_id": "a", "status": "success"}]}, fp) + + op = DbtProducerWatcherOperator(project_dir=str(tmp_path), profile_config=None) + op.invocation_mode = InvocationMode.SUBPROCESS # force fallback + + ti = _MockTI() + ctx = {"ti": ti, "run_id": "x"} + + def fake_build_run(self, context, **kw): + from cosmos.operators.local import AbstractDbtLocalBase + + AbstractDbtLocalBase._handle_post_execution(self, self.project_dir, context, True) + return None + + with patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd", fake_build_run): + op.execute(context=ctx) + + compressed = ti.store.get("run_results") + assert compressed + data = json.loads(zlib.decompress(base64.b64decode(compressed)).decode()) + assert data["results"][0]["status"] == "success" + + +@patch("cosmos.dbt.runner.is_available", return_value=False) +@patch("cosmos.operators.watcher.DbtLocalBaseOperator.execute", return_value="done") +def test_execute_discovers_invocation_mode(_mock_execute, _mock_is_available): + """If invocation_mode is unset, execute() should discover and set it.""" + + from cosmos.config import InvocationMode + + op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) + assert op.invocation_mode is None # precondition + + ti = _MockTI() + ctx = {"ti": ti, "run_id": "xyz"} + + result = op.execute(context=ctx) + + assert result == "done" + assert op.invocation_mode == InvocationMode.SUBPROCESS