From bebe3f43d6d9bee1783efc6c44d5648851a60c5e Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 14 May 2026 18:34:19 +0100 Subject: [PATCH 1/5] Forward extra args to test-integration hatch script Allow `hatch run tests.:test-integration -k ` (and other pytest flags) to filter or otherwise customize the integration run without editing scripts/test/integration.sh. `hatch run` does not forward extra args to a script unless the script definition contains the `{args}` placeholder. The script itself must also accept the forwarded args via `"$@"`. Without both pieces in place, attempts to filter with `-k` or `PYTEST_ADDOPTS` are silently ignored -- the env-var approach in particular fails because integration.sh already passes its own `-k` exclusion list and pytest only honors the last `-k`. This change adds `{args}` to the pyproject script definition, `"$@"` to the pytest invocation in integration.sh, and documents the new filtering workflow in the contributor guide. CI behavior is unchanged because CI invokes the script without extra args. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/policy/contributing.rst | 9 +++++++++ pyproject.toml | 2 +- scripts/test/integration.sh | 3 ++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/docs/policy/contributing.rst b/docs/policy/contributing.rst index ebcd8985fe..c3da85ae10 100644 --- a/docs/policy/contributing.rst +++ b/docs/policy/contributing.rst @@ -153,6 +153,15 @@ If testing for the same Airflow and Python version, next runs of the integration hatch run tests.py3.11-2.10-1.9:test-integration +Any extra arguments passed after ``test-integration`` are forwarded to ``pytest``. This is convenient for iterating on a single test: + +.. code-block:: bash + + hatch run tests.py3.11-2.10-1.9:test-integration -k test_name + hatch run tests.py3.11-2.10-1.9:test-integration tests/operators/test_watcher.py::test_name + +The same applies to other pytest flags (``-x``, ``-vv``, ``--lf``, ``--pdb``). Note: ``scripts/test/integration.sh`` passes its own ``-k`` exclusion list for k8s and python-models DAGs, so a ``-k`` passed here overrides that exclusion. Setting ``PYTEST_ADDOPTS="-k ..."`` does not work — pytest only honors the last ``-k`` it sees, and the script's ``-k`` wins. + Writing Docs ^^^^^^^^^^^^ diff --git a/pyproject.toml b/pyproject.toml index f3d10321d5..31339fa517 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -191,7 +191,7 @@ freeze = "pip freeze" test = 'sh scripts/test/unit.sh' test-cov = 'sh scripts/test/unit-cov.sh' test-integration-setup = 'sh scripts/test/integration-setup.sh {matrix:dbt}' -test-integration = 'sh scripts/test/integration.sh' +test-integration = 'sh scripts/test/integration.sh {args}' test-kubernetes = "sh scripts/test/integration-kubernetes.sh" test-kubernetes-setup = "sh scripts/test/kubernetes-setup.sh {matrix:dbt}" test-integration-dbtf-setup = 'sh scripts/test/integration-dbtf-setup.sh' diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index b9166823c9..defd1678ee 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -33,4 +33,5 @@ pytest -vv \ --ignore=dev/dags/cross_project_dbt_ls_dag.py \ --ignore=tests/test_telemetry.py \ -k 'not (simple_dag_async or example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes or jaffle_shop_watcher_kubernetes)' \ - $SPLIT_ARGS + $SPLIT_ARGS \ + "$@" From 79c973fe977504cfc2249cf70d85e77be47b4946 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 15 May 2026 00:31:06 +0100 Subject: [PATCH 2/5] Retry watcher downstream models on upstream-failure recovery When a watcher producer's first dbt build attempt fails, dbt marks the failed model and every transitive downstream node with ``node_status="skipped"`` and emits ``SkippingDetails`` events (``LogSkipBecauseError`` when the upstream is ephemeral). The producer log parser pushed those "skipped" statuses straight to XCom, so each downstream consumer sensor raised ``AirflowSkipException`` and ended in SKIPPED. Airflow does not retry SKIPPED tasks, so even after the failing upstream model recovered via its own consumer-retry fallback, the downstream model was never re-run. The DAG completed "green" with the downstream tables silently un-materialized. See Linear BOSS-401 for the customer reproduction. This rewrites the affected status from "skipped" to "failed" so the downstream consumer fails on attempt 1, Airflow retries it, and the existing ``_fallback_to_non_watcher_run`` path runs the model locally once the upstream has recovered. ``SkippingDetails`` / ``LogSkipBecauseError`` are the right discriminator because they are fired only from dbt's ``on_skip()``, which is reached only when ``do_skip(cause=...)`` was called -- i.e. exclusively on upstream-node failure. Empty, ephemeral, and selector-excluded skips do not fire these events. dbt also emits a later ``NodeFinished`` with ``node_status="skipped"`` for the same node from the runner's ``finally`` block, which would otherwise overwrite the rewritten XCom; tracking affected ``unique_id``\\ s in a per-execution set lets us rewrite both events consistently. The fix is in the shared log parser, so it applies uniformly to both ``InvocationMode.DBT_RUNNER`` (protobuf events serialised via ``MessageToJson``) and ``InvocationMode.SUBPROCESS`` (``--log-format json`` stdout lines). The producer operator owns the accumulator set (``DbtProducerWatcherOperator._upstream_failure_skipped_ids``), bound to the parser via ``functools.partial`` and cleared at the start of each ``execute()``. A new dbt project ``dev/dags/dbt/watcher_upstream_failure_recovery`` (three models: ``model_a`` succeeds, ``model_flaky`` fails-once via a Postgres sequence, ``model_downstream`` depends on ``model_flaky``) and an integration test ``test_dbt_task_group_watcher_retry_recovers_skipped_downstream`` exercise the end-to-end recovery. The test asserts that ``model_downstream`` lands in ``success`` (was ``skipped`` before the fix) and that its ``try_number > 1`` (proves the consumer fallback path actually fired). The fail-once sequence drop is extracted into a ``reset_fail_once_sequence`` pytest fixture so both this test and the existing ``test_dbt_task_group_watcher_gateway_prevents_downstream_skip`` reuse it. A standalone example DAG ``example_watcher_recovers_skipped_downstream`` is added under ``dev/failed_dags/`` for visual reproduction in Airflow standalone -- failing once on attempt 1, then recovering both the flaky upstream and the (previously) skipped downstream on retry. Co-Authored-By: Claude Opus 4.7 (1M context) --- cosmos/operators/_watcher/base.py | 41 +++++ cosmos/operators/_watcher/state.py | 11 ++ cosmos/operators/watcher.py | 8 + .../dbt_project.yml | 30 ++++ .../models/model_a.sql | 5 + .../models/model_downstream.sql | 4 + .../models/model_flaky.sql | 12 ++ ...ple_watcher_recovers_skipped_downstream.py | 113 +++++++++++++ tests/operators/test_watcher.py | 156 ++++++++++++++++-- 9 files changed, 365 insertions(+), 15 deletions(-) create mode 100644 dev/dags/dbt/watcher_upstream_failure_recovery/dbt_project.yml create mode 100644 dev/dags/dbt/watcher_upstream_failure_recovery/models/model_a.sql create mode 100644 dev/dags/dbt/watcher_upstream_failure_recovery/models/model_downstream.sql create mode 100644 dev/dags/dbt/watcher_upstream_failure_recovery/models/model_flaky.sql create mode 100644 dev/failed_dags/example_watcher_recovers_skipped_downstream.py diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index 6b7f7a158d..5e97a700b4 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -32,6 +32,7 @@ is_dbt_node_status_skipped, is_dbt_node_status_success, is_dbt_node_status_terminal, + is_dbt_upstream_failure_skip_event, is_producer_task_terminated, safe_xcom_push, xcom_set_lock, @@ -253,6 +254,33 @@ def _ensure_subprocess_model_outlet_uris( model_outlet_uris[_MODEL_OUTLET_URIS_ATTEMPTED_KEY] = [] # type: ignore[assignment] +def _rewrite_upstream_failure_skip_status( + log_line: dict[str, Any], + unique_id: str | None, + dbt_node_status: Any, + upstream_failure_skipped_ids: set[str] | None, +) -> Any: + """Track upstream-failure-skipped nodes and rewrite their status from "skipped" to "failed". + + dbt emits two events for a node that it skipped because of an upstream + failure: ``SkippingDetails``/``LogSkipBecauseError`` during ``on_skip()``, + and a later ``NodeFinished`` from the runnable ``finally``-block -- both + carry ``node_status="skipped"``. The first identifies the upstream-failure + cause (those events are reached only via ``do_skip(cause=...)``, exclusively + on upstream-node failure); the second would otherwise overwrite the + rewritten XCom with the original ``"skipped"``. Tracking affected + ``unique_id``\\ s in a per-execution set lets the parser rewrite both. See + Linear BOSS-401. + """ + if not unique_id or upstream_failure_skipped_ids is None: + return dbt_node_status + if is_dbt_upstream_failure_skip_event(log_line.get("info", {}).get("name")): + upstream_failure_skipped_ids.add(unique_id) + if dbt_node_status == "skipped" and unique_id in upstream_failure_skipped_ids: + return "failed" + return dbt_node_status + + def store_dbt_resource_status_from_log( line: str, extra_kwargs: Any, @@ -261,6 +289,7 @@ def store_dbt_resource_status_from_log( test_results_per_model: dict[str, list[str]] | None = None, model_outlet_uris: dict[str, list[str]] | None = None, dataset_namespace: str | None = None, + upstream_failure_skipped_ids: set[str] | None = None, ) -> None: """ Parses a single line from dbt JSON logs and stores node status to Airflow XCom. @@ -281,6 +310,12 @@ def store_dbt_resource_status_from_log( :param model_outlet_uris: Mutable dict mapping unique_id to outlet URIs. Populated lazily from the manifest on first terminal status detection. :param dataset_namespace: The OL-compatible dataset namespace for URI construction. + :param upstream_failure_skipped_ids: Mutable accumulator set of node unique_ids + that dbt skipped because of an upstream-node failure. Populated when this + function sees a ``SkippingDetails`` or ``LogSkipBecauseError`` event; later + ``NodeFinished`` events with ``node_status="skipped"`` for these unique_ids + are rewritten to ``"failed"`` so the consumer sensor fails (and Airflow can + retry it) rather than going SKIPPED. See BOSS-401. """ try: log_line = json.loads(line) @@ -301,6 +336,12 @@ def store_dbt_resource_status_from_log( dbt_node_resource_type = node_info.get("resource_type") unique_id = node_info.get("unique_id") + # Rewrite "skipped" to "failed" when dbt skipped the node because an + # upstream node failed (BOSS-401). See _rewrite_upstream_failure_skip_status. + dbt_node_status = _rewrite_upstream_failure_skip_status( + log_line, unique_id, dbt_node_status, upstream_failure_skipped_ids + ) + logger.debug("Model: %s is in %s state", unique_id, dbt_node_status) # Handle terminal statuses for both models (success/failed) and tests (pass/fail) diff --git a/cosmos/operators/_watcher/state.py b/cosmos/operators/_watcher/state.py index b27e63fafe..58ff8095d3 100644 --- a/cosmos/operators/_watcher/state.py +++ b/cosmos/operators/_watcher/state.py @@ -25,6 +25,12 @@ DBT_FAILED_STATUSES = frozenset({"failed", "fail", "error", "runtime error"}) DBT_SKIPPED_STATUSES = frozenset({"skipped"}) +# dbt event names that signal a node was skipped because an upstream node failed. +# dbt fires SkippingDetails (non-ephemeral upstream) or LogSkipBecauseError +# (ephemeral upstream) only from on_skip(), which is reached only when +# do_skip(cause=...) was called -- i.e. exclusively on upstream node failure. +DBT_UPSTREAM_FAILURE_SKIP_EVENT_NAMES = frozenset({"SkippingDetails", "LogSkipBecauseError"}) + # dbt source freshness statuses that mark a source as stale and propagate skips downstream. DBT_SOURCE_FRESHNESS_STALE_STATUSES = frozenset({"error", "warn"}) @@ -66,6 +72,11 @@ def is_dbt_node_status_terminal(status: str | None) -> bool: return is_dbt_node_status_success(status) or is_dbt_node_status_failed(status) or is_dbt_node_status_skipped(status) +def is_dbt_upstream_failure_skip_event(event_name: str | None) -> bool: + """Check if the dbt event name indicates a node was skipped because an upstream node failed.""" + return event_name in DBT_UPSTREAM_FAILURE_SKIP_EVENT_NAMES + + def is_producer_task_terminated(state: str | None) -> bool: """Return True when the producer task is in a terminal state.""" return state in PRODUCER_TERMINAL_STATES diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index ab45454e74..c1dc3bfd74 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -197,6 +197,12 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # Mutable dict populated lazily from the manifest; shared with the log parser. self._dataset_namespace: str | None = None self._model_outlet_uris: dict[str, list[str]] = {} + # Mutable set populated by the log parser when dbt emits SkippingDetails + # or LogSkipBecauseError for a node; subsequent "skipped" terminal events + # for those unique_ids are rewritten to "failed" so the consumer sensor + # fails on attempt 1 (instead of SKIPPED, which Airflow will not retry). + # See BOSS-401. + self._upstream_failure_skipped_ids: set[str] = set() def _handle_datasets(self, context: Context) -> None: """No-op override: consumer tasks handle their own dataset emission in WATCHER mode.""" @@ -209,6 +215,7 @@ def _make_parse_callable(self) -> Callable[[str, Any], None]: test_results_per_model=self.test_results_per_model, model_outlet_uris=self._model_outlet_uris, dataset_namespace=self._dataset_namespace, + upstream_failure_skipped_ids=self._upstream_failure_skipped_ids, ) def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str, **kwargs: Any) -> Any: @@ -399,6 +406,7 @@ def execute(self, context: Context, **kwargs: Any) -> Any: # Pre-compute the dataset namespace for per-model outlet URI generation. self._dataset_namespace = get_dataset_namespace(self.profile_config) self._model_outlet_uris.clear() + self._upstream_failure_skipped_ids.clear() task_instance = context.get("ti") if task_instance is None: diff --git a/dev/dags/dbt/watcher_upstream_failure_recovery/dbt_project.yml b/dev/dags/dbt/watcher_upstream_failure_recovery/dbt_project.yml new file mode 100644 index 0000000000..bfe74f0425 --- /dev/null +++ b/dev/dags/dbt/watcher_upstream_failure_recovery/dbt_project.yml @@ -0,0 +1,30 @@ +name: 'watcher_upstream_failure_recovery' + +config-version: 2 +version: '0.1' + +profile: 'default' + +model-paths: ["models"] +seed-paths: ["seeds"] +test-paths: ["tests"] +macro-paths: ["macros"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_modules" + - "logs" + +require-dbt-version: [">=1.0.0", "<2.0.0"] + +# Sequence used by model_flaky.sql to fail on first run and succeed on subsequent +# runs (see BOSS-401 regression test). Using a project-specific sequence name +# avoids state leaking from / into watcher_downstream_not_skipped, which uses +# the same fail-once recipe but tests a different invariant. +on-run-start: + - "CREATE SEQUENCE IF NOT EXISTS {{ target.schema }}._cosmos_recovery_fail_once_seq" + +models: + watcher_upstream_failure_recovery: + +materialized: table diff --git a/dev/dags/dbt/watcher_upstream_failure_recovery/models/model_a.sql b/dev/dags/dbt/watcher_upstream_failure_recovery/models/model_a.sql new file mode 100644 index 0000000000..2031e20406 --- /dev/null +++ b/dev/dags/dbt/watcher_upstream_failure_recovery/models/model_a.sql @@ -0,0 +1,5 @@ +select 1 as id, 'Alice' as first_name, 'Smith' as last_name, 'alice@example.com' as email +union all +select 2, 'Bob', 'Jones', 'bob@example.com' +union all +select 3, 'Charlie', 'Brown', 'charlie@example.com' diff --git a/dev/dags/dbt/watcher_upstream_failure_recovery/models/model_downstream.sql b/dev/dags/dbt/watcher_upstream_failure_recovery/models/model_downstream.sql new file mode 100644 index 0000000000..7b373bb260 --- /dev/null +++ b/dev/dags/dbt/watcher_upstream_failure_recovery/models/model_downstream.sql @@ -0,0 +1,4 @@ +select + id, + first_name +from {{ ref('model_flaky') }} diff --git a/dev/dags/dbt/watcher_upstream_failure_recovery/models/model_flaky.sql b/dev/dags/dbt/watcher_upstream_failure_recovery/models/model_flaky.sql new file mode 100644 index 0000000000..e4821c1a49 --- /dev/null +++ b/dev/dags/dbt/watcher_upstream_failure_recovery/models/model_flaky.sql @@ -0,0 +1,12 @@ +{{ + config( + pre_hook=[ + "DO $$ BEGIN IF nextval('{{ target.schema }}._cosmos_recovery_fail_once_seq') <= 1 THEN RAISE EXCEPTION 'fail_once: intentional first-run failure'; END IF; END $$" + ] + ) +}} + +select + id, + first_name +from {{ ref('model_a') }} diff --git a/dev/failed_dags/example_watcher_recovers_skipped_downstream.py b/dev/failed_dags/example_watcher_recovers_skipped_downstream.py new file mode 100644 index 0000000000..4e40cee71c --- /dev/null +++ b/dev/failed_dags/example_watcher_recovers_skipped_downstream.py @@ -0,0 +1,113 @@ +""" +Demonstrate watcher-mode recovery of a downstream model that dbt skipped +because its upstream failed on the producer's first attempt (BOSS-401). + +Without the fix in ``cosmos/operators/_watcher/base.py``: +- A dbt model fails on the first producer attempt. +- dbt marks every downstream node ``skipped`` with the upstream-failure cause. +- The producer log parser pushes that ``"skipped"`` status to XCom. +- The downstream consumer sensor raises ``AirflowSkipException`` -- SKIPPED. +- Airflow retries the producer task (the producer's retry is a no-op by + design: Cosmos restores XCom and raises ``AirflowSkipException`` to avoid + re-running the whole dbt build). +- The consumer sensor for the failing upstream retries on its own and falls + back to running ``dbt --select `` locally, which succeeds. +- The downstream consumer, however, was already SKIPPED. Airflow does not + retry skipped tasks, so the downstream model is never re-run even though + its upstream has now recovered. +- The DAG ends in ``success`` because Airflow treats SKIPPED as non-failure + -- a "false green" outcome with un-materialized downstream tables. + +With the fix, the producer parser rewrites the ``"skipped"`` status to +``"failed"`` for any node that dbt skipped via ``SkippingDetails`` / +``LogSkipBecauseError`` (the only paths reached when ``do_skip(cause=...)`` +fires -- i.e. exclusively on upstream-node failure). The downstream consumer +then fails on attempt 1, Airflow retries it, and the same consumer-fallback +path that recovers the failing upstream now runs the downstream locally. + +Models used (from ``dev/dags/dbt/watcher_upstream_failure_recovery``): +- ``model_a``: trivial source-style model, succeeds. +- ``model_flaky``: uses an ``on-run-start`` Postgres sequence to fail on + the first ``nextval`` call (``<= 1`` → ``RAISE EXCEPTION``) and succeed + on subsequent calls. +- ``model_downstream``: depends on ``model_flaky``; dbt skips it on + attempt 1 because its upstream failed. + +A ``post_dbt`` ``EmptyOperator`` downstream of the task group makes the +"green DAG" visible. A ``cleanup`` SQL task drops the sequence at the end +so the DAG is re-runnable from a clean state. +""" + +import os +from datetime import datetime, timedelta +from pathlib import Path + +from airflow.models import DAG +from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator + +try: + from airflow.providers.standard.operators.empty import EmptyOperator +except ImportError: + from airflow.operators.empty import EmptyOperator + +from cosmos import DbtTaskGroup, ExecutionConfig, ProfileConfig, ProjectConfig +from cosmos.constants import ExecutionMode +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags/dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) +DBT_PROJECT_PATH = DBT_ROOT_PATH / "watcher_upstream_failure_recovery" + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + +execution_config = ExecutionConfig( + execution_mode=ExecutionMode.WATCHER, +) + +operator_args = { + "install_deps": True, + "execution_timeout": timedelta(seconds=120), +} + +if os.getenv("CI"): + operator_args["trigger_rule"] = "all_success" + +default_args = { + "retries": 2, + "retry_delay": timedelta(seconds=0), +} + +with DAG( + dag_id="example_watcher_recovers_skipped_downstream", + schedule="@daily", + start_date=datetime(2023, 1, 1), + catchup=False, + default_args=default_args, +): + dbt_group = DbtTaskGroup( + group_id="watcher_upstream_failure_recovery", + execution_config=execution_config, + project_config=ProjectConfig(DBT_PROJECT_PATH), + profile_config=profile_config, + operator_args=operator_args, + ) + + post_dbt = EmptyOperator(task_id="post_dbt") + + cleanup = SQLExecuteQueryOperator( + task_id="drop_fail_once_marker", + conn_id="example_conn", + sql="DROP SEQUENCE IF EXISTS public._cosmos_recovery_fail_once_seq;", + trigger_rule="all_done", + ) + + dbt_group >> post_dbt + dbt_group >> cleanup diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index a1c1fc5423..981f84b83b 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -48,6 +48,9 @@ DBT_WATCHER_DOWNSTREAM_NOT_SKIPPED_PATH = ( Path(__file__).parent.parent.parent / "dev/dags/dbt/watcher_downstream_not_skipped" ) +DBT_WATCHER_UPSTREAM_FAILURE_RECOVERY_PATH = ( + Path(__file__).parent.parent.parent / "dev/dags/dbt/watcher_upstream_failure_recovery" +) DBT_EXECUTABLE_PATH = Path(__file__).parent.parent.parent / "venv-subprocess/bin/dbt" DBT_PROJECT_WITH_EMPTY_MODEL_PATH = Path(__file__).parent.parent / "sample/dbt_project_with_empty_model" @@ -83,6 +86,38 @@ class _MockContext(dict): pass +@pytest.fixture +def reset_fail_once_sequence(): + """Drop a fail-once Postgres sequence on ``example_conn`` so a watcher + retry-recovery integration test starts from a clean state. + + The watcher integration tests use a per-project Postgres sequence + (created in the dbt project's ``on-run-start``) plus a model ``pre_hook`` + that ``nextval``s into it and raises if the value is <= 1. That makes the + first run of the flaky model fail and any subsequent run succeed -- the + minimal recipe for exercising the consumer-fallback retry path. Each test + drops its sequence before running so re-runs and CI parallelism don't + leak sequence state. + """ + import psycopg2 + from airflow.hooks.base import BaseHook + + def _drop(sequence_name: str) -> None: + airflow_conn = BaseHook.get_connection("example_conn") + with psycopg2.connect( + host=airflow_conn.host, + port=airflow_conn.port or 5432, + dbname=airflow_conn.schema or "postgres", + user=airflow_conn.login, + password=airflow_conn.password, + ) as conn: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute(f"DROP SEQUENCE IF EXISTS public.{sequence_name}") + + return _drop + + def test_dbt_producer_watcher_operator_priority_weight_default(): """Test that DbtProducerWatcherOperator uses default priority_weight of 9999.""" op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) @@ -2074,14 +2109,12 @@ def skip_orders(context, dag, task_group, nodes, sources_json): ), ) @pytest.mark.integration -def test_dbt_task_group_watcher_gateway_prevents_downstream_skip(caplog): +def test_dbt_task_group_watcher_gateway_prevents_downstream_skip(caplog, reset_fail_once_sequence): """ Verify that the dbt_producer_watcher_done gateway task prevents the producer's skip from propagating to tasks downstream of the DbtTaskGroup. """ - import psycopg2 from airflow import DAG - from airflow.hooks.base import BaseHook from cosmos import DbtTaskGroup @@ -2090,18 +2123,7 @@ def test_dbt_task_group_watcher_gateway_prevents_downstream_skip(caplog): except ImportError: from airflow.operators.empty import EmptyOperator - # Reset the fail_once sequence using credentials from the same Airflow connection - airflow_conn = BaseHook.get_connection("example_conn") - with psycopg2.connect( - host=airflow_conn.host, - port=airflow_conn.port or 5432, - dbname=airflow_conn.schema or "postgres", - user=airflow_conn.login, - password=airflow_conn.password, - ) as conn: - conn.autocommit = True - with conn.cursor() as cur: - cur.execute("DROP SEQUENCE IF EXISTS public._cosmos_fail_once_seq") + reset_fail_once_sequence("_cosmos_fail_once_seq") caplog.set_level(logging.DEBUG, logger="cosmos.operators._watcher.base") @@ -2142,6 +2164,110 @@ def test_dbt_task_group_watcher_gateway_prevents_downstream_skip(caplog): assert tis["watcher_downstream_not_skipped.dbt_producer_watcher_done"].state == "success" +@pytest.mark.skipif( + AIRFLOW_VERSION < Version("2.10") or (Version("3.0.0") <= AIRFLOW_VERSION < Version("3.2.0")), + reason=( + "dag.test() in Airflow 2.9 hangs when a task fails with retries configured. " + "Airflow 3.0 runs tasks inline via _run_raw_task without the task SDK supervisor, " + "so RuntimeTaskInstance.get_task_states raises NameError for SUPERVISOR_COMMS and " + "the watcher sensor cannot detect producer termination on retry. " + "Airflow 3.1.x crashes during task finalization (SetRenderedFields) when retrying " + "tasks inside a DbtTaskGroup via dag.test()." + ), +) +@pytest.mark.integration +def test_dbt_task_group_watcher_retry_recovers_skipped_downstream(caplog, reset_fail_once_sequence): + """Regression for BOSS-401 / Linear customer issue. + + When a dbt model fails on the producer's first attempt, dbt emits + ``SkippingDetails`` (and/or ``LogSkipBecauseError`` for ephemeral upstreams) + plus a later ``NodeFinished`` event with ``node_status="skipped"`` for each + downstream node. Before the fix that "skipped" status went straight to XCom, + the consumer sensor raised ``AirflowSkipException``, and the task ended + SKIPPED. Airflow does not retry SKIPPED tasks, so even after the failing + upstream recovered on its own consumer retry, the downstream model was never + re-run -- the DAG ended in a "false green" outcome with un-materialized + downstream tables. + + The fix (``store_dbt_resource_status_from_log`` in + ``cosmos/operators/_watcher/base.py``) tracks unique_ids that dbt skipped + due to upstream-node failure (the only path that fires + ``SkippingDetails``/``LogSkipBecauseError``, since both come from + ``on_skip()`` which is only reached via ``do_skip(cause=...)``). Subsequent + "skipped" terminal events for those unique_ids are rewritten to "failed" so + the consumer fails on attempt 1, Airflow retries it, and the existing + ``_fallback_to_non_watcher_run`` path runs the model locally once its + upstream has recovered. + """ + from airflow import DAG + + from cosmos import DbtTaskGroup + + try: + from airflow.providers.standard.operators.empty import EmptyOperator + except ImportError: + from airflow.operators.empty import EmptyOperator + + reset_fail_once_sequence("_cosmos_recovery_fail_once_seq") + + caplog.set_level(logging.DEBUG, logger="cosmos.operators._watcher.base") + + with DAG( + dag_id="watcher_upstream_failure_recovery_test", + start_date=datetime(2023, 1, 1), + default_args={"retries": 2, "retry_delay": timedelta(seconds=0)}, + dagrun_timeout=timedelta(seconds=180), + ) as dag: + dbt_group = DbtTaskGroup( + group_id="watcher_upstream_failure_recovery", + execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER), + project_config=ProjectConfig(dbt_project_path=DBT_WATCHER_UPSTREAM_FAILURE_RECOVERY_PATH), + profile_config=profile_config, + render_config=RenderConfig(emit_datasets=False, test_behavior=TestBehavior.NONE), + operator_args={"trigger_rule": "none_failed", "execution_timeout": timedelta(seconds=180)}, + ) + + # Force gateway >> root consumers so dag.test() respects execution order + # (see test_dbt_task_group_watcher_gateway_prevents_downstream_skip for the + # apache/airflow#56723 context). + gateway = dag.task_dict["watcher_upstream_failure_recovery.dbt_producer_watcher_done"] + for root_task in dbt_group.get_roots(): + if root_task.task_id.endswith("_done") or root_task.task_id.endswith("dbt_producer_watcher"): + continue + gateway >> root_task + + post_dbt = EmptyOperator(task_id="post_dbt") + dbt_group >> post_dbt + + outcome = new_test_dag(dag, expected_dag_state=DagRunState.SUCCESS) + + tis = {ti.task_id: ti for ti in outcome.get_task_instances()} + + # Core BOSS-401 assertion: model_downstream must end SUCCESS, not SKIPPED. + downstream_ti = tis["watcher_upstream_failure_recovery.model_downstream_run"] + assert downstream_ti.state == "success", ( + f"model_downstream_run ended in '{downstream_ti.state}' -- expected 'success'. " + "Before the BOSS-401 fix, dbt's 'skipped' status for upstream-failure skips " + "was pushed to XCom unchanged, the consumer sensor raised AirflowSkipException, " + "and Airflow did not retry the SKIPPED task even after the upstream model " + "recovered on its own consumer retry." + ) + # Proves the consumer fallback retry actually fired -- guards against a future + # refactor that happens to leave the task in 'success' for a different reason. + assert downstream_ti.try_number > 1, ( + f"model_downstream_run succeeded on its first try (try_number={downstream_ti.try_number}); " + "the test setup is no longer exercising the retry-after-failure path." + ) + + # The flaky upstream itself also recovers via the consumer-retry fallback. + assert tis["watcher_upstream_failure_recovery.model_flaky_run"].state == "success" + assert tis["watcher_upstream_failure_recovery.model_a_run"].state == "success" + + # Producer/gateway/post_dbt sanity (matches the gateway-prevents-skip test). + assert tis["watcher_upstream_failure_recovery.dbt_producer_watcher_done"].state == "success" + assert tis["post_dbt"].state == "success" + + def test_dbt_source_watcher_operator_template_fields(): """Test that DbtSourceWatcherOperator includes model_unique_id as a consumer sensor.""" from cosmos.operators._watcher.base import BaseConsumerSensor From 025ed17791792bccb6e31b0a5ca39556b88eb614 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 19 May 2026 00:11:45 +0100 Subject: [PATCH 3/5] Address #2684 review: link public issue and add unit tests Replaces every reference to the private Linear identifier ``BOSS-401`` with the public GitHub issue ``#2698`` across cosmos source, the watcher upstream-failure-recovery dbt project, the standalone repro DAG, and the integration test. Resolves Pankaj's review comment about avoiding private issue links in OSS source. Adds three unit-test cases in ``TestStoreDbtStatusFromLog`` exercising the new ``_rewrite_upstream_failure_skip_status`` helper without the integration env, so the BOSS-401 invariant is regression-caught on every CI run rather than only on the integration matrix (Copilot's review comment). Coverage: - A ``SkippingDetails`` event followed by a ``NodeFinished`` (and the reverse arrival order) for the same unique_id results in the per-node status XCom being pushed as ``"failed"`` rather than ``"skipped"``, parametrized for both ``SkippingDetails`` and ``LogSkipBecauseError``. - A plain ``NodeFinished`` ``"skipped"`` event for an unrelated unique_id is unchanged. - A ``SkippingDetails`` event with no accumulator argument is unchanged (verifies the helper's None-handling). Co-Authored-By: Claude Opus 4.7 (1M context) --- cosmos/operators/_watcher/base.py | 6 +- cosmos/operators/watcher.py | 2 +- .../dbt_project.yml | 2 +- ...ple_watcher_recovers_skipped_downstream.py | 2 +- tests/operators/test_watcher.py | 78 ++++++++++++++++++- 5 files changed, 81 insertions(+), 9 deletions(-) diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index 5e97a700b4..65474af10d 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -270,7 +270,7 @@ def _rewrite_upstream_failure_skip_status( on upstream-node failure); the second would otherwise overwrite the rewritten XCom with the original ``"skipped"``. Tracking affected ``unique_id``\\ s in a per-execution set lets the parser rewrite both. See - Linear BOSS-401. + #2698. """ if not unique_id or upstream_failure_skipped_ids is None: return dbt_node_status @@ -315,7 +315,7 @@ def store_dbt_resource_status_from_log( function sees a ``SkippingDetails`` or ``LogSkipBecauseError`` event; later ``NodeFinished`` events with ``node_status="skipped"`` for these unique_ids are rewritten to ``"failed"`` so the consumer sensor fails (and Airflow can - retry it) rather than going SKIPPED. See BOSS-401. + retry it) rather than going SKIPPED. See #2698. """ try: log_line = json.loads(line) @@ -337,7 +337,7 @@ def store_dbt_resource_status_from_log( unique_id = node_info.get("unique_id") # Rewrite "skipped" to "failed" when dbt skipped the node because an - # upstream node failed (BOSS-401). See _rewrite_upstream_failure_skip_status. + # upstream node failed (#2698). See _rewrite_upstream_failure_skip_status. dbt_node_status = _rewrite_upstream_failure_skip_status( log_line, unique_id, dbt_node_status, upstream_failure_skipped_ids ) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index c1dc3bfd74..10d713a473 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -201,7 +201,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # or LogSkipBecauseError for a node; subsequent "skipped" terminal events # for those unique_ids are rewritten to "failed" so the consumer sensor # fails on attempt 1 (instead of SKIPPED, which Airflow will not retry). - # See BOSS-401. + # See #2698. self._upstream_failure_skipped_ids: set[str] = set() def _handle_datasets(self, context: Context) -> None: diff --git a/dev/dags/dbt/watcher_upstream_failure_recovery/dbt_project.yml b/dev/dags/dbt/watcher_upstream_failure_recovery/dbt_project.yml index bfe74f0425..b6a3459288 100644 --- a/dev/dags/dbt/watcher_upstream_failure_recovery/dbt_project.yml +++ b/dev/dags/dbt/watcher_upstream_failure_recovery/dbt_project.yml @@ -19,7 +19,7 @@ clean-targets: require-dbt-version: [">=1.0.0", "<2.0.0"] # Sequence used by model_flaky.sql to fail on first run and succeed on subsequent -# runs (see BOSS-401 regression test). Using a project-specific sequence name +# runs (see #2698 regression test). Using a project-specific sequence name # avoids state leaking from / into watcher_downstream_not_skipped, which uses # the same fail-once recipe but tests a different invariant. on-run-start: diff --git a/dev/failed_dags/example_watcher_recovers_skipped_downstream.py b/dev/failed_dags/example_watcher_recovers_skipped_downstream.py index 4e40cee71c..d728d5b4c5 100644 --- a/dev/failed_dags/example_watcher_recovers_skipped_downstream.py +++ b/dev/failed_dags/example_watcher_recovers_skipped_downstream.py @@ -1,6 +1,6 @@ """ Demonstrate watcher-mode recovery of a downstream model that dbt skipped -because its upstream failed on the producer's first attempt (BOSS-401). +because its upstream failed on the producer's first attempt (#2698). Without the fix in ``cosmos/operators/_watcher/base.py``: - A dbt model fails on the first producer attempt. diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 981f84b83b..c6117f5579 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -405,6 +405,20 @@ def test_emit_datasets_pushes_xcom_when_enabled(self, mock_register): class TestStoreDbtStatusFromLog: """Tests for store_dbt_resource_status_from_log and _process_log_line_callable.""" + @staticmethod + def _node_log_line(unique_id: str, node_status: str, event_name: str = "NodeFinished") -> str: + return json.dumps( + { + "info": {"name": event_name}, + "data": { + "node_info": { + "node_status": node_status, + "unique_id": unique_id, + } + }, + } + ) + def test_store_dbt_resource_status_from_log_success(self): """Test that success status is correctly parsed and stored in XCom.""" ti = _MockTI() @@ -427,6 +441,64 @@ def test_store_dbt_resource_status_from_log_failed(self): assert ti.store.get("model__pkg__failed_model_status") == {"status": "failed", "outlet_uris": []} + @pytest.mark.parametrize( + "first_event_name, second_event_name", + [ + ("SkippingDetails", "NodeFinished"), + ("NodeFinished", "SkippingDetails"), + ("LogSkipBecauseError", "NodeFinished"), + ("NodeFinished", "LogSkipBecauseError"), + ], + ) + def test_store_dbt_resource_status_from_log_rewrites_upstream_failure_skips( + self, first_event_name, second_event_name + ): + """Upstream-failure skip events are stored as failed so Airflow can retry the consumer.""" + ti = _MockTI() + ctx = {"ti": ti} + upstream_failure_skipped_ids: set[str] = set() + + store_dbt_resource_status_from_log( + self._node_log_line("model.pkg.downstream_model", "skipped", first_event_name), + {"context": ctx}, + tests_per_model={}, + test_results_per_model={}, + upstream_failure_skipped_ids=upstream_failure_skipped_ids, + ) + store_dbt_resource_status_from_log( + self._node_log_line("model.pkg.downstream_model", "skipped", second_event_name), + {"context": ctx}, + tests_per_model={}, + test_results_per_model={}, + upstream_failure_skipped_ids=upstream_failure_skipped_ids, + ) + + assert upstream_failure_skipped_ids == {"model.pkg.downstream_model"} + assert ti.store.get("model__pkg__downstream_model_status") == {"status": "failed", "outlet_uris": []} + + def test_store_dbt_resource_status_from_log_keeps_plain_skips_as_skipped(self): + """Plain skips are unchanged, including when no rewrite accumulator is passed.""" + ti = _MockTI() + ctx = {"ti": ti} + upstream_failure_skipped_ids: set[str] = set() + + store_dbt_resource_status_from_log( + self._node_log_line("model.pkg.with_accumulator", "skipped"), + {"context": ctx}, + tests_per_model={}, + test_results_per_model={}, + upstream_failure_skipped_ids=upstream_failure_skipped_ids, + ) + store_dbt_resource_status_from_log( + self._node_log_line("model.pkg.without_accumulator", "skipped", "SkippingDetails"), + {"context": ctx}, + tests_per_model={}, + test_results_per_model={}, + ) + + assert ti.store.get("model__pkg__with_accumulator_status") == {"status": "skipped", "outlet_uris": []} + assert ti.store.get("model__pkg__without_accumulator_status") == {"status": "skipped", "outlet_uris": []} + def test_store_dbt_resource_status_from_log_ignores_other_statuses(self): """Test that statuses other than success/failed are ignored.""" ti = _MockTI() @@ -2177,7 +2249,7 @@ def test_dbt_task_group_watcher_gateway_prevents_downstream_skip(caplog, reset_f ) @pytest.mark.integration def test_dbt_task_group_watcher_retry_recovers_skipped_downstream(caplog, reset_fail_once_sequence): - """Regression for BOSS-401 / Linear customer issue. + """Regression for upstream-failure skips that previously left downstream models skipped (#2698). When a dbt model fails on the producer's first attempt, dbt emits ``SkippingDetails`` (and/or ``LogSkipBecauseError`` for ephemeral upstreams) @@ -2243,11 +2315,11 @@ def test_dbt_task_group_watcher_retry_recovers_skipped_downstream(caplog, reset_ tis = {ti.task_id: ti for ti in outcome.get_task_instances()} - # Core BOSS-401 assertion: model_downstream must end SUCCESS, not SKIPPED. + # Core #2698 assertion: model_downstream must end SUCCESS, not SKIPPED. downstream_ti = tis["watcher_upstream_failure_recovery.model_downstream_run"] assert downstream_ti.state == "success", ( f"model_downstream_run ended in '{downstream_ti.state}' -- expected 'success'. " - "Before the BOSS-401 fix, dbt's 'skipped' status for upstream-failure skips " + "Before the #2698 fix, dbt's 'skipped' status for upstream-failure skips " "was pushed to XCom unchanged, the consumer sensor raised AirflowSkipException, " "and Airflow did not retry the SKIPPED task even after the upstream model " "recovered on its own consumer retry." From 4017c3ed237046a141b385e5e2a1c0a604cf1583 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 19 May 2026 08:36:27 +0100 Subject: [PATCH 4/5] Cover deeper downstream chain in retry-recovery integration test dbt skips every transitively-downstream node when an upstream fails, emitting a ``SkippingDetails`` event for each one. The fix in ``store_dbt_resource_status_from_log`` rewrites the per-node status keyed on ``unique_id``, so it handles arbitrary chain depth -- but the existing integration test only had one downstream level (``model_flaky -> model_downstream``) and therefore did not exercise that property. Add ``model_downstream_2`` to the dbt project, depending on ``model_downstream``, and parametrize the integration test over both downstream task ids so the regression is caught for every node along the chain. Pankaj asked for explicit nested-downstream coverage in the #2684 review. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../models/model_downstream_2.sql | 4 +++ tests/operators/test_watcher.py | 33 ++++++++++--------- 2 files changed, 21 insertions(+), 16 deletions(-) create mode 100644 dev/dags/dbt/watcher_upstream_failure_recovery/models/model_downstream_2.sql diff --git a/dev/dags/dbt/watcher_upstream_failure_recovery/models/model_downstream_2.sql b/dev/dags/dbt/watcher_upstream_failure_recovery/models/model_downstream_2.sql new file mode 100644 index 0000000000..62c07fefcf --- /dev/null +++ b/dev/dags/dbt/watcher_upstream_failure_recovery/models/model_downstream_2.sql @@ -0,0 +1,4 @@ +select + id, + first_name +from {{ ref('model_downstream') }} diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index c6117f5579..7872d9ff2b 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -2248,7 +2248,17 @@ def test_dbt_task_group_watcher_gateway_prevents_downstream_skip(caplog, reset_f ), ) @pytest.mark.integration -def test_dbt_task_group_watcher_retry_recovers_skipped_downstream(caplog, reset_fail_once_sequence): +@pytest.mark.parametrize( + "downstream_task_id", + [ + # Level-1 downstream of the flaky model. + "watcher_upstream_failure_recovery.model_downstream_run", + # Level-2 downstream -- confirms the fix handles arbitrary chain depth, + # since dbt emits SkippingDetails for each transitively-skipped node. + "watcher_upstream_failure_recovery.model_downstream_2_run", + ], +) +def test_dbt_task_group_watcher_retry_recovers_skipped_downstream(caplog, reset_fail_once_sequence, downstream_task_id): """Regression for upstream-failure skips that previously left downstream models skipped (#2698). When a dbt model fails on the producer's first attempt, dbt emits @@ -2315,21 +2325,12 @@ def test_dbt_task_group_watcher_retry_recovers_skipped_downstream(caplog, reset_ tis = {ti.task_id: ti for ti in outcome.get_task_instances()} - # Core #2698 assertion: model_downstream must end SUCCESS, not SKIPPED. - downstream_ti = tis["watcher_upstream_failure_recovery.model_downstream_run"] - assert downstream_ti.state == "success", ( - f"model_downstream_run ended in '{downstream_ti.state}' -- expected 'success'. " - "Before the #2698 fix, dbt's 'skipped' status for upstream-failure skips " - "was pushed to XCom unchanged, the consumer sensor raised AirflowSkipException, " - "and Airflow did not retry the SKIPPED task even after the upstream model " - "recovered on its own consumer retry." - ) - # Proves the consumer fallback retry actually fired -- guards against a future - # refactor that happens to leave the task in 'success' for a different reason. - assert downstream_ti.try_number > 1, ( - f"model_downstream_run succeeded on its first try (try_number={downstream_ti.try_number}); " - "the test setup is no longer exercising the retry-after-failure path." - ) + # Core #2698 assertion: the parametrized downstream model must end SUCCESS, + # not SKIPPED, and must have actually been retried (try_number > 1 proves + # the consumer-fallback path fired rather than the task happening to succeed + # on the first attempt). + assert tis[downstream_task_id].state == "success" + assert tis[downstream_task_id].try_number > 1 # The flaky upstream itself also recovers via the consumer-retry fallback. assert tis["watcher_upstream_failure_recovery.model_flaky_run"].state == "success" From 2fdffcf8befbf210462129ea10e0c3de5ea9ebc3 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 19 May 2026 12:48:08 +0100 Subject: [PATCH 5/5] Apply BOSS-401 skip-rewrite fix to WATCHER_KUBERNETES mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The K8s watcher path called ``store_dbt_resource_status_from_log`` from its static progress callback without an ``upstream_failure_skipped_ids`` accumulator, so the upstream-failure ``"skipped"`` → ``"failed"`` rewrite introduced for the local watcher in #2684 silently did nothing on ``ExecutionMode.WATCHER_KUBERNETES`` — the same false-green symptom from issue #2698 could still surface there on retry. Mirror the per-operator accumulator from ``DbtProducerWatcherOperator``: hold a ``set[str]`` on ``DbtProducerWatcherKubernetesOperator``, clear it in ``execute``, and expose it to the static ``WatcherKubernetesCallback.progress_callback`` via the same module-level-global pattern already used for the task context. The callback now forwards the accumulator to the shared log parser, so the fix applies uniformly to both watcher backends. Co-Authored-By: Claude Opus 4.7 (1M context) --- cosmos/operators/watcher_kubernetes.py | 31 +++++++++++++++++++------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/cosmos/operators/watcher_kubernetes.py b/cosmos/operators/watcher_kubernetes.py index 3cab4835e4..5b5811627d 100644 --- a/cosmos/operators/watcher_kubernetes.py +++ b/cosmos/operators/watcher_kubernetes.py @@ -44,9 +44,13 @@ logger = get_logger(__name__) -# This global variable is currently used to make the task context available to the K8s callback. -# While the callback is set during the operator initialization, the context is only created during the operator's execution. +# Module-level globals used to make per-execution state available to the static +# K8s callback (see WatcherKubernetesCallback). They are set in +# DbtProducerWatcherKubernetesOperator.execute, read in the callback, and +# mirror the per-operator state DbtProducerWatcherOperator threads through its +# functools.partial-wrapped parser. producer_task_context = None +producer_upstream_failure_skipped_ids: set[str] | None = None class WatcherKubernetesCallback(KubernetesPodOperatorCallback): # type: ignore[misc] @@ -73,10 +77,12 @@ def progress_callback( :param pod: the pod from which the log line was read. """ if "context" not in kwargs: - # This global variable is used to make the task context available to the K8s callback. - # While the callback is set during the operator initialization, the context is only created during the operator's execution. kwargs["context"] = producer_task_context - store_dbt_resource_status_from_log(line, kwargs) + store_dbt_resource_status_from_log( + line, + kwargs, + upstream_failure_skipped_ids=producer_upstream_failure_skipped_ids, + ) class DbtProducerWatcherKubernetesOperator(DbtBuildKubernetesOperator): @@ -98,6 +104,12 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: kwargs["callbacks"] = normalized_callbacks super().__init__(task_id=task_id, *args, **kwargs) self.dbt_cmd_flags += ["--log-format", "json"] + # Mutable set populated by the log parser when dbt emits SkippingDetails + # or LogSkipBecauseError for a node; subsequent "skipped" terminal events + # for those unique_ids are rewritten to "failed" so the consumer sensor + # fails on attempt 1 (instead of SKIPPED, which Airflow will not retry). + # Mirrors DbtProducerWatcherOperator._upstream_failure_skipped_ids; see #2698. + self._upstream_failure_skipped_ids: set[str] = set() @cached_property def pod_manager(self) -> CosmosKubernetesPodManager: @@ -121,10 +133,13 @@ def execute(self, context: Context, **kwargs: Any) -> Any: _init_xcom_backup(context) - # This global variable is used to make the task context available to the K8s callback. - # While the callback is set during the operator initialization, the context is only created during the operator's execution. - global producer_task_context + self._upstream_failure_skipped_ids.clear() + + # Make per-execution state available to the static K8s callback. See the + # module-level globals at the top of this file for details. + global producer_task_context, producer_upstream_failure_skipped_ids producer_task_context = context + producer_upstream_failure_skipped_ids = self._upstream_failure_skipped_ids try: return_value = super().execute(context, **kwargs)