From 992075bd1a04119da325e09f4c0db0794b8c7ac0 Mon Sep 17 00:00:00 2001 From: chesterxgchen Date: Tue, 2 Jan 2024 18:37:53 -0800 Subject: [PATCH] add ask all clients to end run when server in exception --- .../common_workflows/base_wf_controller.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/nvflare/app_common/common_workflows/base_wf_controller.py b/nvflare/app_common/common_workflows/base_wf_controller.py index b939856e70..646923eaf0 100644 --- a/nvflare/app_common/common_workflows/base_wf_controller.py +++ b/nvflare/app_common/common_workflows/base_wf_controller.py @@ -22,7 +22,7 @@ from nvflare.apis.controller_spec import ClientTask, ControllerSpec, OperatorMethod, Task, TaskOperatorKey from nvflare.apis.dxo import DXO, DataKind from nvflare.apis.fl_component import FLComponent -from nvflare.apis.fl_constant import ReturnCode +from nvflare.apis.fl_constant import ReturnCode, ReservedTopic from nvflare.apis.fl_context import FLContext from nvflare.apis.shareable import Shareable from nvflare.apis.signal import Signal @@ -105,6 +105,16 @@ def start_workflow(self, abort_signal, fl_ctx): self.log_error(fl_ctx, error_msg) self.wf_queue.ask_abort(error_msg) self.system_panic(error_msg, fl_ctx=fl_ctx) + # ask all clients to end run! + self.engine.send_aux_request( + targets=None, + topic=ReservedTopic.END_RUN, + request=Shareable(), + timeout=0.0, + fl_ctx=fl_ctx, + optional=True, + secure=False, + ) finally: wait_time = self.comm_msg_pull_interval + 0.05 self.stop_msg_queue("job finished", fl_ctx, wait_time)