diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b06f1b2bf9..fdbcd03b8f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,8 +1,22 @@ Changelog ========= +1.13.0a2 (2026-01-22) +--------------------- + +(Includes more changes) + +Features + +- Introduce ``ExecutionMode.WATCHER_KUBERNETES`` to use watcher with ``KubernetesPodOperator`` by @tatiana in #2207 + +Bug Fixes + +- Fix running empty models or ephemeral nodes in ``ExecutionMode.WATCHER`` by @tatiana in #2279 + + 1.12.1 (2026-01-14) ----------------------- +------------------- Bug Fixes diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 7503750955..ba0da02221 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -10,7 +10,7 @@ from cosmos import settings -__version__ = "1.13.0a1" +__version__ = "1.13.0a2" if not settings.enable_memory_optimised_imports: diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index 389717a41d..8fbf936d88 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -146,7 +146,10 @@ def _get_status_from_run_results(self, ti: Any, context: Context) -> Any: node_result = next((r for r in results if r.get("unique_id") == self.model_unique_id), None) if not node_result: # pragma: no cover - logger.warning("No matching result found for unique_id '%s'", self.model_unique_id) + logger.warning( + "The dbt node with unique_id '%s' was not executed by the dbt command run in the producer task. This may happen if it is an ephemeral model or if the model sql file is empty.", + self.model_unique_id, + ) return None logger.info("Node Info: %s", run_results_json) @@ -198,10 +201,17 @@ def execute(self, context: Context, **kwargs: Any) -> None: def execute_complete(self, context: Context, event: dict[str, str]) -> None: status = event.get("status") + reason = event.get("reason") + + if status == "success" and reason == "model_not_run": + logger.info( + "Model '%s' was skipped by the dbt command. This may happen if it is an ephemeral model or if the model sql file is empty.", + self.model_unique_id, + ) + if status != "failed": return - reason = event.get("reason") if reason == "model_failed": raise AirflowException( f"dbt model '{self.model_unique_id}' failed. Review the producer task '{self.producer_task_id}' logs for details." diff --git a/cosmos/operators/_watcher/triggerer.py b/cosmos/operators/_watcher/triggerer.py index 8de3bfb6e7..d232c08322 100644 --- a/cosmos/operators/_watcher/triggerer.py +++ b/cosmos/operators/_watcher/triggerer.py @@ -12,8 +12,11 @@ from packaging.version import Version from cosmos.constants import AIRFLOW_VERSION +from cosmos.log import get_logger from cosmos.operators._watcher.state import build_producer_state_fetcher +logger = get_logger(__name__) + class WatcherTrigger(BaseTrigger): @@ -82,7 +85,7 @@ def _get_xcom_val() -> Any | None: return await sync_to_async(_get_xcom_val)() async def get_xcom_val(self, key: str) -> Any | None: - self.log.info( + logger.info( "Trying to retrieve value using XCom key <%s> by task_id <%s>, dag_id <%s>, run_id <%s> and map_index <%s>", key, self.producer_task_id, @@ -120,7 +123,7 @@ async def _get_producer_task_status(self) -> str | None: dag_id=self.dag_id, run_id=self.run_id, producer_task_id=self.producer_task_id, - logger=self.log, + logger=logger, ) if fetch_state is None: return None @@ -128,30 +131,39 @@ async def _get_producer_task_status(self) -> str | None: return await sync_to_async(fetch_state)() async def run(self) -> AsyncIterator[TriggerEvent]: - self.log.info("Starting WatcherTrigger for model: %s", self.model_unique_id) + logger.info("Starting WatcherTrigger for model: %s", self.model_unique_id) while True: producer_task_state = await self._get_producer_task_status() node_status = await self._parse_node_status() if node_status == "success": - self.log.info("Model '%s' succeeded", self.model_unique_id) + logger.info("Model '%s' succeeded", self.model_unique_id) yield TriggerEvent({"status": "success"}) # type: ignore[no-untyped-call] return elif node_status == "failed": - self.log.warning("Model '%s' failed", self.model_unique_id) + logger.warning("Model '%s' failed", self.model_unique_id) yield TriggerEvent({"status": "failed", "reason": "model_failed"}) # type: ignore[no-untyped-call] return elif producer_task_state == "failed": - self.log.error( + logger.error( "Watcher producer task '%s' failed before delivering results for model '%s'", self.producer_task_id, self.model_unique_id, ) yield TriggerEvent({"status": "failed", "reason": "producer_failed"}) # type: ignore[no-untyped-call] return + elif producer_task_state == "success" and node_status is None: + logger.info( + "The producer task '%s' succeeded. There is no information about the model '%s' execution.", + self.producer_task_id, + self.model_unique_id, + ) + yield TriggerEvent({"status": "success", "reason": "model_not_run"}) # type: ignore[no-untyped-call] + return + # Sleep briefly before re-polling await asyncio.sleep(self.poke_interval) - self.log.debug("Polling again for model '%s' status...", self.model_unique_id) + logger.debug("Polling again for model '%s' status...", self.model_unique_id) def _parse_compressed_xcom(compressed_b64_event_msg: str) -> Any: diff --git a/tests/operators/_watcher/test_triggerer.py b/tests/operators/_watcher/test_triggerer.py index c1b3a9ad4c..d620509a1d 100644 --- a/tests/operators/_watcher/test_triggerer.py +++ b/tests/operators/_watcher/test_triggerer.py @@ -121,6 +121,7 @@ async def test_get_xcom_val_branches(self, airflow_version, expected_val): ("success", "running", {"status": "success"}), ("failed", "running", {"status": "failed", "reason": "model_failed"}), (None, "failed", {"status": "failed", "reason": "producer_failed"}), + (None, "success", {"status": "success", "reason": "model_not_run"}), ], ) async def test_run_various_outcomes(self, node_status, producer_state, expected): @@ -205,6 +206,27 @@ def _import_side_effect(name: str, *args, **kwargs): assert state is None + @pytest.mark.asyncio + async def test_run_producer_success_model_not_run(self, caplog): + """Test that when producer succeeds but model has no status, trigger yields success with model_not_run reason.""" + get_producer_status_mock = AsyncMock(return_value="success") + parse_node_status_mock = AsyncMock(return_value=None) + + caplog.set_level("INFO") + + with ( + patch.object(self.trigger, "_get_producer_task_status", get_producer_status_mock), + patch.object(self.trigger, "_parse_node_status", parse_node_status_mock), + ): + events = [] + async for event in self.trigger.run(): + events.append(event) + + assert len(events) == 1 + assert events[0].payload == {"status": "success", "reason": "model_not_run"} + assert "The producer task 'task_1' succeeded" in caplog.text + assert "There is no information about the model 'model.test' execution" in caplog.text + @pytest.mark.asyncio async def test_run_poke_interval_and_debug_log(self, caplog): get_xcom_val_mock = AsyncMock(side_effect=["compressed_data"]) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 282ec8126f..e882efe6e3 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -33,7 +33,7 @@ DBT_PROJECT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop" DBT_PROFILES_YAML_FILEPATH = DBT_PROJECT_PATH / "profiles.yml" - +DBT_PROJECT_WITH_EMPTY_MODEL_PATH = Path(__file__).parent.parent / "sample/dbt_project_with_empty_model" project_config = ProjectConfig( dbt_project_path=DBT_PROJECT_PATH, @@ -184,6 +184,31 @@ def test_handle_startup_event(): assert lst and lst[0]["name"] == "MainReportVersion" +def test_dbt_consumer_watcher_sensor_execute_complete_model_not_run_logs_message(caplog): + """Test that execute_complete logs an info message when model was skipped (model_not_run).""" + sensor = DbtConsumerWatcherSensor( + project_dir=".", + profiles_dir=".", + profile_config=profile_config, + model_unique_id="model.pkg.skipped_model", + poke_interval=1, + producer_task_id="dbt_producer_watcher_operator", + task_id="consumer_sensor", + ) + sensor.model_unique_id = "model.pkg.skipped_model" + + context = {"dag_run": MagicMock()} + event = {"status": "success", "reason": "model_not_run"} + + with caplog.at_level(logging.INFO): + sensor.execute_complete(context, event) + + assert any( + "Model 'model.pkg.skipped_model' was skipped by the dbt command" in message for message in caplog.messages + ) + assert any("ephemeral model or if the model sql file is empty" in message for message in caplog.messages) + + def test_dbt_producer_watcher_operator_logs_retry_message(caplog): op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) ti = _MockTI() @@ -218,6 +243,7 @@ def test_dbt_producer_watcher_operator_skips_retry_attempt(caplog): "event, expected_message", [ ({"status": "success"}, None), + ({"status": "success", "reason": "model_not_run"}, None), ( {"status": "failed", "reason": "model_failed"}, "dbt model 'model.pkg.m' failed. Review the producer task 'dbt_producer_watcher' logs for details.", @@ -1041,6 +1067,94 @@ def test_dbt_dag_with_watcher(): } +# Airflow 3.0.0 hangs indefinitely, while Airflow 3.0.6 fails due to this Airflow bug: +# https://github.com/apache/airflow/issues/51816 +conditions_to_skip = (AIRFLOW_VERSION < Version("2.8"), AIRFLOW_VERSION == Version("3.0")) + + +@pytest.mark.skipif( + conditions_to_skip, + reason="Airflow hangs in these versions when trying to fetch XCom from the triggerer when using dags.test()", +) +@pytest.mark.integration +def test_dbt_dag_with_watcher_and_empty_model(caplog): + """ + Run a DbtDag using `ExecutionMode.WATCHER` and a dbt project with an empty model. This was a situation observed by an Astronomer customer. + Confirm the right amount of tasks is created and that tasks are in the expected topological order. + Confirm that the producer watcher task is created and that it is the parent of the root dbt nodes. + """ + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECT_WITH_EMPTY_MODEL_PATH, + ) + # There are two dbt projects defined in this folder. + # When we run `dbt ls`, we can see this: + # + # 10:32:30 Found 2 models, 464 macros + # micro_dbt_project.add_row + # micro_dbt_project.empty_model + # + # However, during `dbt build`, dbt skips running the empty model, and only runs the add_row model: + # + # 10:29:03 Running with dbt=1.11.2 + # 10:29:03 Registered adapter: postgres=1.10.0 + # 10:29:03 Found 2 models, 464 macros + # 10:29:03 + # 10:29:03 Concurrency: 4 threads (target='dev') + # 10:29:03 + # 10:29:03 1 of 1 START sql view model public.add_row ..................................... [RUN] + # 10:29:03 1 of 1 OK created sql view model public.add_row ................................ [CREATE VIEW in 0.06s] + # 10:29:03 + # 10:29:03 Finished running 1 view model in 0 hours 0 minutes and 0.19 seconds (0.19s). + # 10:29:03 + # 10:29:03 Completed successfully + # 10:29:03 + # 10:29:03 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=1 + + watcher_dag = DbtDag( + project_config=project_config, + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="watcher_dag", + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.WATCHER, + invocation_mode=InvocationMode.DBT_RUNNER, + ), + render_config=RenderConfig(emit_datasets=False, test_behavior=TestBehavior.NONE), + operator_args={ + "trigger_rule": "all_success", + "execution_timeout": timedelta(seconds=10), + }, + dagrun_timeout=timedelta(seconds=30), + ) + outcome = new_test_dag(watcher_dag) + assert outcome.state == DagRunState.SUCCESS + + assert len(watcher_dag.dbt_graph.filtered_nodes) == 2 + + assert len(watcher_dag.task_dict) == 3 + tasks_names = [task.task_id for task in watcher_dag.topological_sort()] + expected_task_names = [ + "dbt_producer_watcher", + "add_row_run", + "empty_model_run", + ] + assert tasks_names == expected_task_names + + assert isinstance(watcher_dag.task_dict["dbt_producer_watcher"], DbtProducerWatcherOperator) + assert isinstance(watcher_dag.task_dict["add_row_run"], DbtRunWatcherOperator) + assert isinstance(watcher_dag.task_dict["empty_model_run"], DbtRunWatcherOperator) + + assert watcher_dag.task_dict["dbt_producer_watcher"].downstream_task_ids == { + "add_row_run", + "empty_model_run", + } + + assert "Total filtered nodes: 2" in caplog.text + assert "Finished running node model.micro_dbt_project.add_row" in caplog.text + assert "Finished running node model.micro_dbt_project.empty_model_run" not in caplog.text + assert "Model 'model.micro_dbt_project.empty_model' was skipped by the dbt command" in caplog.text + + @pytest.mark.skipif(AIRFLOW_VERSION < Version("2.7"), reason="Airflow did not have dag.test() until the 2.6 release") @pytest.mark.integration def test_dbt_task_group_with_watcher(): diff --git a/tests/sample/dbt_project_with_empty_model/dbt_project.yml b/tests/sample/dbt_project_with_empty_model/dbt_project.yml new file mode 100644 index 0000000000..8cc2839b08 --- /dev/null +++ b/tests/sample/dbt_project_with_empty_model/dbt_project.yml @@ -0,0 +1,7 @@ +name: micro_dbt_project +version: '1.0' +config-version: 2 + +profile: default + +model-paths: ["models"] diff --git a/tests/sample/dbt_project_with_empty_model/models/add_row.sql b/tests/sample/dbt_project_with_empty_model/models/add_row.sql new file mode 100644 index 0000000000..a445cd8009 --- /dev/null +++ b/tests/sample/dbt_project_with_empty_model/models/add_row.sql @@ -0,0 +1,12 @@ +with base as ( + select + 1 as id, + 'original_row' as description + +) + +select * from base +union all +select + 2 as id, + 'added_row' as description diff --git a/tests/sample/dbt_project_with_empty_model/models/empty_model.sql b/tests/sample/dbt_project_with_empty_model/models/empty_model.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/sample/dbt_project_with_empty_model/models/schema.yml b/tests/sample/dbt_project_with_empty_model/models/schema.yml new file mode 100644 index 0000000000..e53e2a22c1 --- /dev/null +++ b/tests/sample/dbt_project_with_empty_model/models/schema.yml @@ -0,0 +1,8 @@ +version: 2 + +models: + - name: add_row + description: "Model that selects data and adds an extra row" + + - name: empty_model + description: "Empty model definition" diff --git a/tests/sample/dbt_project_with_empty_model/profiles.yml b/tests/sample/dbt_project_with_empty_model/profiles.yml new file mode 100644 index 0000000000..224f565f4a --- /dev/null +++ b/tests/sample/dbt_project_with_empty_model/profiles.yml @@ -0,0 +1,12 @@ +default: + target: dev + outputs: + dev: + type: postgres + host: "{{ env_var('POSTGRES_HOST') }}" + user: "{{ env_var('POSTGRES_USER') }}" + password: "{{ env_var('POSTGRES_PASSWORD') }}" + port: "{{ env_var('POSTGRES_PORT') | int }}" + dbname: "{{ env_var('POSTGRES_DB') }}" + schema: "{{ env_var('POSTGRES_SCHEMA') }}" + threads: 4