From 5f9c76ae3f811479f0d64964a723d13783bcb513 Mon Sep 17 00:00:00 2001 From: Giovanni Corsetti <155465603+corsettigyg@users.noreply.github.com> Date: Thu, 27 Feb 2025 13:21:23 +0100 Subject: [PATCH 1/6] Update graph.py --- cosmos/airflow/graph.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index f96d3ba2a7..fc4ed66edc 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -270,6 +270,7 @@ def create_task_metadata( } if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: + raise Exception("testing") exclude_detached_tests_if_needed(node, args, detached_from_parent) task_id, args = _get_task_id_and_args( node, args, use_task_group, normalize_task_id, "build", include_resource_type=True From 49ccfefd712d52b37f56b51aa538ee49dd1574a0 Mon Sep 17 00:00:00 2001 From: Giovanni Corsetti <155465603+corsettigyg@users.noreply.github.com> Date: Thu, 27 Feb 2025 13:24:46 +0100 Subject: [PATCH 2/6] Update graph.py --- cosmos/airflow/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index fc4ed66edc..d3466ea169 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -270,7 +270,7 @@ def create_task_metadata( } if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: - raise Exception("testing") + args["on_warning_callback"] = on_warning_callback exclude_detached_tests_if_needed(node, args, detached_from_parent) task_id, args = _get_task_id_and_args( node, args, use_task_group, normalize_task_id, "build", include_resource_type=True From f63b8f2135f33e584f2e6bae2fe987840982dfdd Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Thu, 27 Feb 2025 13:44:42 +0100 Subject: [PATCH 3/6] remove args["on_warning_callback"] = on_warning_callback unused --- cosmos/airflow/graph.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index d3466ea169..f96d3ba2a7 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -270,7 +270,6 @@ def create_task_metadata( } if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: - args["on_warning_callback"] = on_warning_callback exclude_detached_tests_if_needed(node, args, detached_from_parent) task_id, args = _get_task_id_and_args( node, args, use_task_group, normalize_task_id, "build", include_resource_type=True From 8bca98c02b912c2bf4ce63b0f9728c8028042f17 Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Thu, 27 Feb 2025 13:47:22 +0100 Subject: [PATCH 4/6] add on_warning_callback metrhod to BUILD --- cosmos/airflow/graph.py | 1 + cosmos/operators/local.py | 30 +++++++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index f96d3ba2a7..d3466ea169 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -270,6 +270,7 @@ def create_task_metadata( } if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: + args["on_warning_callback"] = on_warning_callback exclude_detached_tests_if_needed(node, args, detached_from_parent) task_id, args = _get_task_id_and_args( node, args, use_task_group, normalize_task_id, "build", include_resource_type=True diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 21fa6ae915..af64fa13a9 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -762,8 +762,36 @@ class DbtBuildLocalOperator(DbtBuildMixin, DbtLocalBaseOperator): template_fields: Sequence[str] = DbtLocalBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] - def __init__(self, *args: Any, **kwargs: Any) -> None: + def __init__(self, *args: Any, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + self.on_warning_callback = on_warning_callback + self.extract_issues: Callable[..., tuple[list[str], list[str]]] + + def _handle_warnings(self, result: FullOutputSubprocessResult | dbtRunnerResult, context: Context) -> None: + """ + Handles warnings by extracting log issues, creating additional context, and calling the + on_warning_callback with the updated context. + + :param result: The result object from the build and run command. + :param context: The original airflow context in which the build and run command was executed. + """ + if self.invocation_mode == InvocationMode.SUBPROCESS: + self.extract_issues = extract_freshness_warn_msg + elif self.invocation_mode == InvocationMode.DBT_RUNNER: + self.extract_issues = dbt_runner.extract_message_by_status + + test_names, test_results = self.extract_issues(result) + + warning_context = dict(context) + warning_context["test_names"] = test_names + warning_context["test_results"] = test_results + + self.on_warning_callback and self.on_warning_callback(warning_context) + + def execute(self, context: Context, **kwargs: Any) -> None: + result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) + if self.on_warning_callback: + self._handle_warnings(result, context) class DbtLSLocalOperator(DbtLSMixin, DbtLocalBaseOperator): From 547786701bb3e7809a4b55b1ee0300f0022e8a9e Mon Sep 17 00:00:00 2001 From: Giovanni Corsetti <155465603+corsettigyg@users.noreply.github.com> Date: Thu, 27 Feb 2025 16:31:06 +0100 Subject: [PATCH 5/6] add DbtBuildLocalOperator test on_warning_callback --- tests/operators/test_local.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 3bcd78616d..f63a7f46c2 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -664,7 +664,17 @@ def test_run_test_operator_with_callback(invocation_mode, failing_test_dbt_proje on_warning_callback=on_warning_callback, invocation_mode=invocation_mode, ) - run_operator >> test_operator + + build_operator = DbtBuildLocalOperator( + profile_config=mini_profile_config, + project_dir=failing_test_dbt_project, + task_id="build", + append_env=True, + on_warning_callback=on_warning_callback, + invocation_mode=invocation_mode, + ) + + run_operator >> build_operator >> test_operator run_test_dag(dag) assert on_warning_callback.called From 7add6af795d10fe9ffd31328268b190e46981671 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 27 Feb 2025 15:40:04 +0000 Subject: [PATCH 6/6] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/operators/test_local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index f63a7f46c2..a721cd8587 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -673,7 +673,7 @@ def test_run_test_operator_with_callback(invocation_mode, failing_test_dbt_proje on_warning_callback=on_warning_callback, invocation_mode=invocation_mode, ) - + run_operator >> build_operator >> test_operator run_test_dag(dag) assert on_warning_callback.called