diff --git a/nvflare/private/fed/server/fed_server.py b/nvflare/private/fed/server/fed_server.py index 2f6334b470..f123559557 100644 --- a/nvflare/private/fed/server/fed_server.py +++ b/nvflare/private/fed/server/fed_server.py @@ -32,10 +32,12 @@ ServerCommandKey, ServerCommandNames, SnapshotKey, + SystemComponents, WorkspaceConstants, ) from nvflare.apis.fl_context import FLContext from nvflare.apis.fl_exception import NotAuthenticated +from nvflare.apis.job_def import JobMetaKey, RunStatus from nvflare.apis.workspace import Workspace from nvflare.fuel.common.exit_codes import ProcessExitCode from nvflare.fuel.f3.cellnet.cell import Cell @@ -374,10 +376,23 @@ def _listen_command(self, request: Message) -> Message: reply = make_cellnet_reply(F3ReturnCode.OK, "", None) return reply elif command == ServerCommandNames.HEARTBEAT: + if job_id not in self.engine.run_processes: + self.engine.abort_app_on_server(job_id) + self._set_job_aborted(job_id) + self.logger.info( + f"Job: {job_id} should not be running, but still sending the heartbeat calls. Abort the job." + ) return make_cellnet_reply(F3ReturnCode.OK, "", None) else: return make_cellnet_reply(F3ReturnCode.INVALID_REQUEST, "", None) + def _set_job_aborted(self, job_id): + job_manager = self.engine.get_component(SystemComponents.JOB_MANAGER) + with self.engine.new_context() as fl_ctx: + job = job_manager.get_job(job_id, fl_ctx) + if job.meta.get(JobMetaKey.STATUS) == RunStatus.RUNNING: + job_manager.set_status(job_id, RunStatus.FINISHED_ABORTED, fl_ctx) + def _create_server_engine(self, args, snapshot_persistor): return ServerEngine( server=self, args=args, client_manager=self.client_manager, snapshot_persistor=snapshot_persistor @@ -687,12 +702,23 @@ def start_run(self, job_id, run_root, conf, args, snapshot): if self.engine.asked_to_stop: self.engine.engine_info.status = MachineStatus.STOPPED + self._send_parent_heartbeat(job_id) time.sleep(self.check_engine_frequency) finally: self.engine.engine_info.status = MachineStatus.STOPPED self.run_manager = None + def _send_parent_heartbeat(self, job_id): + if self.cell: + request = new_cell_message({CellMessageHeaderKeys.JOB_ID: job_id}, {}) + self.cell.fire_and_forget( + targets=FQCN.ROOT_SERVER, + channel=CellChannel.SERVER_PARENT_LISTENER, + topic=ServerCommandNames.HEARTBEAT, + message=request, + ) + def create_run_manager(self, workspace, job_id): return RunManager( server_name=self.project_name, diff --git a/nvflare/private/fed/server/server_engine.py b/nvflare/private/fed/server/server_engine.py index 60f28bb758..77224cd9dd 100644 --- a/nvflare/private/fed/server/server_engine.py +++ b/nvflare/private/fed/server/server_engine.py @@ -317,8 +317,6 @@ def abort_app_on_clients(self, clients: List[str]) -> str: return "" def abort_app_on_server(self, job_id: str, turn_to_cold: bool = False) -> str: - if job_id not in self.run_processes.keys(): - return "Server app has not started." self.logger.info("Abort the server app run.") command_data = Shareable()