Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cosmos/operators/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,21 @@ def run_dbt_runner(self, command: list[str], env: dict[str, str], cwd: str, **kw
When a callback is registered, dbt's stdout is redirected to a null buffer so that the
raw ``--log-format json`` lines do not appear in Airflow task logs alongside the
human-readable messages already emitted by ``_log_dbt_msg`` inside the callback.

The callback is intentionally **not** registered during the source freshness pre-check
(``context["_check_source_freshness"] is True``). Registering it there would leave a
stale entry in ``_dbt_runner_callbacks`` that fires again for every event during the
subsequent ``dbt build``, producing duplicate log lines. Freshness results are read from
``target/sources.json`` after the run and do not need per-event XCom pushes.
"""
context = kwargs.get("context")
if context is not None:
# During the source freshness pre-check suppress raw JSON stdout, but do not register
# the XCom-pushing callback so it cannot accumulate and duplicate build logs later.
if context.get("_check_source_freshness"):
with contextlib.redirect_stdout(_NullWriter()):
return super().run_dbt_runner(command, env, cwd, **kwargs)

extra_kwargs: dict[str, Any] = {"project_dir": cwd, "context": context}
parse = self._make_parse_callable()
# Collect callback errors rather than raising inside the callback: dbt catches
Expand Down
24 changes: 24 additions & 0 deletions tests/operators/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2133,3 +2133,27 @@ def test_skipped_node_token_noop_when_empty(self):
producer._skipped_node_token(context, [])
ti.xcom_push.assert_not_called()
assert producer.exclude is None

def test_run_dbt_runner_skips_callback_during_source_freshness(self):
"""run_dbt_runner must not register the XCom-pushing callback during the source freshness
pre-check. Registering it would leave a stale entry in _dbt_runner_callbacks that fires
again for every event during the subsequent dbt build, producing duplicate log lines.
"""
producer = self._make_producer(_check_source_freshness=True)
producer._dbt_runner_callbacks = None

context = MagicMock()
context.get.side_effect = lambda key, default=None: True if key == "_check_source_freshness" else default

from cosmos.operators.local import DbtLocalBaseOperator

with patch.object(
DbtLocalBaseOperator,
"run_dbt_runner",
return_value=MagicMock(),
Comment thread
pankajastro marked this conversation as resolved.
) as mock_super:
producer.run_dbt_runner(command=["dbt", "source", "freshness"], env={}, cwd="/tmp", context=context)

# The callback list must remain untouched — no watcher callback appended
assert producer._dbt_runner_callbacks is None
mock_super.assert_called_once()
Loading