diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index 29781dac80..0f941e754a 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -397,6 +397,15 @@ def store_dbt_resource_status_from_log( _log_dbt_msg(log_line) +# dbt flags set by the producer that must not propagate into the consumer's +# retry command. ``--select`` / ``--exclude`` are dropped because the consumer +# targets a single model and re-applies its own selector; ``--log-format`` is +# dropped because the consumer's retry is a user-facing dbt run and should +# default to text. Each entry consumes the following token as its value (e.g. +# ``--log-format json``). +_PRODUCER_ONLY_FLAGS: tuple[str, ...] = ("--select", "--exclude", "--log-format") + + class BaseConsumerSensor(BaseSensorOperator): # type: ignore[misc] template_fields: tuple[str, ...] = ("model_unique_id", "compiled_sql") # type: ignore[operator] poke_retry_number: int = 0 @@ -446,14 +455,10 @@ def _resource_label(self) -> str: @staticmethod def _filter_flags(flags: list[str]) -> list[str]: - """Filters out dbt flags that should not propagate from the producer to the consumer's retry: - - - ``--select`` / ``--exclude``: the consumer targets a single model and re-applies its own selector. - - ``--log-format``: the producer always sets ``--log-format json`` so it can parse dbt's structured - event stream; the consumer's retry is a user-facing dbt run and should default to text. Users who - want JSON on the retry can opt in via - ``operator_args={"dbt_cmd_flags": ["--log-format", "json"]}``, which is appended by ``build_cmd`` - outside of this flag pipeline. + """Filter out dbt flags that should not propagate from the producer to the consumer's retry. + + The set of stripped flags is defined by ``_PRODUCER_ONLY_FLAGS`` at module + scope; see that constant's docstring for the rationale per flag. """ filtered = [] skip_next = False @@ -463,7 +468,7 @@ def _filter_flags(flags: list[str]) -> list[str]: skip_next = False else: continue # skip value of previous flag - if token in ("--select", "--exclude", "--log-format"): + if token in _PRODUCER_ONLY_FLAGS: skip_next = True continue filtered.append(token)