Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions cosmos/operators/_watcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,15 @@ def store_dbt_resource_status_from_log(
_log_dbt_msg(log_line)


# dbt flags set by the producer that must not propagate into the consumer's
# retry command. ``--select`` / ``--exclude`` are dropped because the consumer
# targets a single model and re-applies its own selector; ``--log-format`` is
# dropped because the consumer's retry is a user-facing dbt run and should
# default to text. Each entry consumes the following token as its value (e.g.
# ``--log-format json``).
_PRODUCER_ONLY_FLAGS: tuple[str, ...] = ("--select", "--exclude", "--log-format")


Comment on lines +404 to +408
class BaseConsumerSensor(BaseSensorOperator): # type: ignore[misc]
template_fields: tuple[str, ...] = ("model_unique_id", "compiled_sql") # type: ignore[operator]
poke_retry_number: int = 0
Expand Down Expand Up @@ -446,14 +455,10 @@ def _resource_label(self) -> str:

@staticmethod
def _filter_flags(flags: list[str]) -> list[str]:
"""Filters out dbt flags that should not propagate from the producer to the consumer's retry:

- ``--select`` / ``--exclude``: the consumer targets a single model and re-applies its own selector.
- ``--log-format``: the producer always sets ``--log-format json`` so it can parse dbt's structured
event stream; the consumer's retry is a user-facing dbt run and should default to text. Users who
want JSON on the retry can opt in via
``operator_args={"dbt_cmd_flags": ["--log-format", "json"]}``, which is appended by ``build_cmd``
outside of this flag pipeline.
"""Filter out dbt flags that should not propagate from the producer to the consumer's retry.

The set of stripped flags is defined by ``_PRODUCER_ONLY_FLAGS`` at module
scope; see that constant's docstring for the rationale per flag.
"""
filtered = []
skip_next = False
Expand All @@ -463,7 +468,7 @@ def _filter_flags(flags: list[str]) -> list[str]:
skip_next = False
else:
continue # skip value of previous flag
if token in ("--select", "--exclude", "--log-format"):
if token in _PRODUCER_ONLY_FLAGS:
skip_next = True
continue
filtered.append(token)
Expand Down