diff --git a/src/wpsremote/processbot.py b/src/wpsremote/processbot.py index c782b84..3262bc3 100644 --- a/src/wpsremote/processbot.py +++ b/src/wpsremote/processbot.py @@ -132,8 +132,6 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess self._input_params_actions = computational_job_input_actions.ComputationalJobInputActions.create_from_config( action_sections) - # create the concrete bus object - self._lock_bus = thread.allocate_lock() self.bus = introspection.get_class_four_arg(bus_class_name, remote_config, self.service, @@ -146,6 +144,12 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess self.bus.RegisterMessageCallback(busIndipendentMessages.FinishMessage, self.handle_finish) self.bus.RegisterMessageCallback(busIndipendentMessages.AbortMessage, self.handle_abort) + def exit(self, return_code, exception=None): + # if exception: + # raise Exception(exception) + sys.stdout.write('') + sys.exit(return_code) + def get_resource_file_dir(self): return self._resource_file_dir @@ -200,6 +204,9 @@ def SpawnProcess(self): # todo: check if cmds contains ","!!! --> pickle? rc.set_from_processbot(os.getpid(), [invoked_process.pid]) rc.write() + + # go to process output synchronuosly + self.process_output_parser(invoked_process) except Exception as ex: logging.exception("Process "+str(self._uniqueExeId)+" Exception: " + str(traceback.format_exc(sys.exc_info()))) @@ -207,41 +214,29 @@ def SpawnProcess(self): self.send_error_message(error_message) # self.bus.disconnect() logger.info("after send job-error message to WPS") - thread.interrupt_main() - os._exit(-1) - - # go to process output synchronuosly - self.process_output_parser(invoked_process) + self._finished = True + self.exit(-1, exception=ex) def process_output_parser(self, invoked_process): logger = logging.getLogger("ProcessBot.process_output_parser") logger.info("start parsing stdout of created process " + self.service) - - with self._lock_bus: + try: + if self.bus.state() != 'connected': + self.bus.xmpp.reconnect() + self.bus.xmpp.send_presence() if self.bus.state() == 'connected': self.bus.SendMessage(busIndipendentMessages. LogMessage(self._remote_wps_endpoint, "INFO", "start parsing stdout of created process " + self.service)) else: - try: - self.bus.xmpp.reconnect() - self.bus.xmpp.send_presence() - # self.bus.xmpp.get_roster() - - if self.bus.state() == 'connected': - self.bus.SendMessage(busIndipendentMessages. - LogMessage(self._remote_wps_endpoint, - "INFO", - "start parsing stdout of created process " + self.service)) - else: - logger.info("[XMPP Disconnected]: Process " + - str(self._uniqueExeId) + - " Could not send info message to GeoServer Endpoint " + - str(self._remote_wps_endpoint)) - except BaseException: - logger.info("[XMPP Disconnected]: Process "+str(self._uniqueExeId) + - " Could not send info message to GeoServer Endpoint "+str(self._remote_wps_endpoint)) + logger.info("[XMPP Disconnected]: Process " + + str(self._uniqueExeId) + + " Could not send info message to GeoServer Endpoint " + + str(self._remote_wps_endpoint)) + except BaseException: + logger.info("[XMPP Disconnected]: Process "+str(self._uniqueExeId) + + " Could not send info message to GeoServer Endpoint "+str(self._remote_wps_endpoint)) # Listen to stdout stdout_parser_compiled = [re.compile(r) for r in self._stdout_parser] @@ -258,34 +253,33 @@ def process_output_parser(self, invoked_process): res = rgx.match(line) if (res): if (action == "progress"): - with self._lock_bus: - if self.bus.state() != 'connected': - try: - self.bus.xmpp.reconnect() - self.bus.xmpp.send_presence() - # self.bus.xmpp.get_roster() - except BaseException: - logger.info("[XMPP Disconnected]: Process " + - str(self._uniqueExeId) + - " Could not send info message to GeoServer Endpoint " + - str(self._remote_wps_endpoint)) + if self.bus.state() != 'connected': + try: + self.bus.xmpp.reconnect() + self.bus.xmpp.send_presence() + except BaseException: + logger.info("[XMPP Disconnected]: Process " + + str(self._uniqueExeId) + + " Could not send info message to GeoServer Endpoint " + + str(self._remote_wps_endpoint)) + if self.bus.state() == 'connected': self.bus.SendMessage(busIndipendentMessages. ProgressMessage(self._remote_wps_endpoint, float(res.group(1).strip()))) # match = True break elif (action == "log"): - with self._lock_bus: - if self.bus.state() != 'connected': - try: - self.bus.xmpp.reconnect() - self.bus.xmpp.send_presence() - # self.bus.xmpp.get_roster() - except BaseException: - logger.info("[XMPP Disconnected]: Process " + - str(self._uniqueExeId) + - " Could not send info message to GeoServer Endpoint " + - str(self._remote_wps_endpoint)) + if self.bus.state() != 'connected': + try: + self.bus.xmpp.reconnect() + self.bus.xmpp.send_presence() + # self.bus.xmpp.get_roster() + except BaseException: + logger.info("[XMPP Disconnected]: Process " + + str(self._uniqueExeId) + + " Could not send info message to GeoServer Endpoint " + + str(self._remote_wps_endpoint)) + if self.bus.state() == 'connected': self.bus.SendMessage(busIndipendentMessages. LogMessage(self._remote_wps_endpoint, res.group(1).strip(), @@ -293,17 +287,17 @@ def process_output_parser(self, invoked_process): # match = True break elif (action == "abort"): - with self._lock_bus: - if self.bus.state() != 'connected': - try: - self.bus.xmpp.reconnect() - self.bus.xmpp.send_presence() - # self.bus.xmpp.get_roster() - except BaseException: - logger.info("[XMPP Disconnected]: Process " + - str(self._uniqueExeId) + - " Could not send info message to GeoServer Endpoint " + - str(self._remote_wps_endpoint)) + if self.bus.state() != 'connected': + try: + self.bus.xmpp.reconnect() + self.bus.xmpp.send_presence() + # self.bus.xmpp.get_roster() + except BaseException: + logger.info("[XMPP Disconnected]: Process " + + str(self._uniqueExeId) + + " Could not send info message to GeoServer Endpoint " + + str(self._remote_wps_endpoint)) + if self.bus.state() == 'connected': self.bus.SendMessage(busIndipendentMessages. ErrorMessage(self._remote_wps_endpoint, res.group(2).strip())) @@ -338,33 +332,30 @@ def process_output_parser(self, invoked_process): p.get_publish_default_style(), p.get_publish_target_workspace(), p.get_metadata()] - except BaseException: + except BaseException as ex: logging.exception("Process "+str(self._uniqueExeId)+" Exception: " + str(traceback.format_exc(sys.exc_info()))) error_message = "process exit code is " + \ str(return_code) + ": failure\n" + "\n".join(str(e) for e in stack_trace) self.send_error_message(error_message) - # self.bus.disconnect() + self.bus.disconnect() logger.info("after send job-error message to WPS") - thread.interrupt_main() - os._exit(return_code) + self._finished = True + self.exit(return_code, exception=ex) logger.info("trying to acquire bus lock...") - with self._lock_bus: - logger.info("bus lock acquired...") + counter = 1 + while not self._finished: if self.bus.state() != 'connected': try: self.bus.xmpp.reconnect() self.bus.xmpp.send_presence() - # self.bus.xmpp.get_roster() except BaseException: logger.info("[XMPP Disconnected]: Process " + str(self._uniqueExeId) + " Could not send info message to GeoServer Endpoint " + str(self._remote_wps_endpoint)) - - counter = 1 - while not self._finished: + if self.bus.state() == 'connected': logger.info("sending 'completed' message tentative #" + str(counter)) self.bus.SendMessage(busIndipendentMessages. CompletedMessage(self._remote_wps_endpoint, @@ -374,76 +365,78 @@ def process_output_parser(self, invoked_process): sleep(10) else: logger.error("Could not contact Remote WPS with. Forcibly shutdown the process...") - thread.interrupt_main() - os._exit(-1) + self._finished = True + self.exit(-1) logger.info("after send job-completed message to WPS") else: error_message = "process exit code is " + \ str(return_code) + ": failure\n" + "\n".join(str(e) for e in stack_trace) logger.critical("process exit code is " + str(return_code) + ": failure") - # todo: should i wait for finish message here as well? No self.send_error_message(error_message) - - # self.bus.disconnect() + self.bus.disconnect() logger.info("after send job-error message to WPS") - thread.interrupt_main() - os._exit(return_code) + self._finished = True + self.exit(return_code) def handle_finish(self, finished_message): - logger = logging.getLogger("ProcessBot.handle_finish") - logger.info("received finish mesasge from WPS") + logger = logging.getLogger("ProcessBot.handle_finish to " + str(self._remote_wps_endpoint)) + logger.error("Received finish mesasge from GeoServer WPS...") self._finished = True - with self._lock_bus: - self.bus.disconnect() - logger.info("disconnected from communication bus") - - sys.exit(0) + self.bus.disconnect() + logger.error("disconnected from communication bus") + self._finished = True + self.exit(0) def handle_abort(self, aborted_message): - logger = logging.getLogger("ProcessBot.handle_abort") - logger.info("received abort mesasge from WPS") + logger = logging.getLogger("ProcessBot.handle_abort to " + str(self._remote_wps_endpoint)) + logger.error("Received abort mesasge from GeoServer WPS...") self._finished = True - with self._lock_bus: - self.bus.disconnect() - logger.info("disconnected from communication bus") - - sys.exit(-1) + self.bus.disconnect() + logger.error("disconnected from communication bus") + self._finished = True + self.exit(-1) def send_error_message(self, msg): logger = logging.getLogger("ProcessBot.send_error_message to " + str(self._remote_wps_endpoint)) logger.error(msg) - with self._lock_bus: - if self.bus.state() == 'connected': - self.bus.SendMessage(busIndipendentMessages.ErrorMessage(self._remote_wps_endpoint, msg)) - else: - try: - self.bus.xmpp.reconnect() - self.bus.xmpp.send_presence() - # self.bus.xmpp.get_roster() + if self.bus.state() != 'connected': + try: + self.bus.xmpp.reconnect() + self.bus.xmpp.send_presence() + except BaseException: + logger.info("[XMPP Disconnected]: Process " + + str(self._uniqueExeId) + + " Could not send info message to GeoServer Endpoint " + + str(self._remote_wps_endpoint)) + if self.bus.state() == 'connected': + self.bus.SendMessage(busIndipendentMessages.ErrorMessage(self._remote_wps_endpoint, msg)) + else: + try: + self.bus.xmpp.reconnect() + self.bus.xmpp.send_presence() + # self.bus.xmpp.get_roster() - if self.bus.state() == 'connected': - self.bus.SendMessage(busIndipendentMessages.ErrorMessage(self._remote_wps_endpoint, msg)) - else: - sys.stdout.write("[XMPP Disconnected]: Process " + - str(self._uniqueExeId) + - " Could not send error message to GeoServer Endpoint " + - str(self._remote_wps_endpoint) + - " " + - msg.replace('\n', ' ').replace('\r', '') + "") - except BaseException: + if self.bus.state() == 'connected': + self.bus.SendMessage(busIndipendentMessages.ErrorMessage(self._remote_wps_endpoint, msg)) + else: sys.stdout.write("[XMPP Disconnected]: Process " + str(self._uniqueExeId) + " Could not send error message to GeoServer Endpoint " + str(self._remote_wps_endpoint) + " " + msg.replace('\n', ' ').replace('\r', '') + "") - + except BaseException: + sys.stdout.write("[XMPP Disconnected]: Process " + + str(self._uniqueExeId) + + " Could not send error message to GeoServer Endpoint " + + str(self._remote_wps_endpoint) + + " " + + msg.replace('\n', ' ').replace('\r', '') + "") logger.debug("send error msg complete") - thread.interrupt_main() - os._exit(-1) + self._finished = True + self.exit(-1) def disconnect(self): - with self._lock_bus: - self.bus.disconnect() + self.bus.disconnect() diff --git a/src/wpsremote/servicebot.py b/src/wpsremote/servicebot.py index f7d0291..73afa11 100644 --- a/src/wpsremote/servicebot.py +++ b/src/wpsremote/servicebot.py @@ -137,25 +137,24 @@ def handle_invite(self, invite_message): """Handler for WPS invite message.""" logger = logging.getLogger("servicebot.handle_invite") logger.info("handle invite message from WPS " + str(invite_message.originator())) - if self.bus.state() != 'connected': - try: + try: + if self.bus.state() != 'connected': self.bus.xmpp.reconnect() self.bus.xmpp.send_presence() - # self.bus.xmpp.get_roster() - except BaseException: - logger.info("[XMPP Disconnected]: Service " + - str(self.service) + - " Could not send info message to GeoServer Endpoint " + - str(self._remote_wps_endpoint)) - self.bus.SendMessage( - busIndipendentMessages.RegisterMessage(invite_message.originator(), - self.service, - self.namespace, - self.description, - self._input_parameters_defs.as_DLR_protocol(), - self._output_parameters_defs.as_DLR_protocol() - ) - ) + self.bus.SendMessage( + busIndipendentMessages.RegisterMessage(invite_message.originator(), + self.service, + self.namespace, + self.description, + self._input_parameters_defs.as_DLR_protocol(), + self._output_parameters_defs.as_DLR_protocol() + ) + ) + except BaseException: + logger.info("[XMPP Disconnected]: Service " + + str(self.service) + + " Could not send info message to GeoServer Endpoint " + + str(self._remote_wps_endpoint)) def handle_execute(self, execute_message): """Handler for WPS execute message.""" @@ -218,20 +217,19 @@ def handle_getloadavg(self, getloadavg_message): outputs['vmem'] = [vmem, 'Percentage of Memory used by the server.'] # Send the message back to the WPS - if self.bus.state() != 'connected': - try: + try: + if self.bus.state() != 'connected': self.bus.xmpp.reconnect() self.bus.xmpp.send_presence() - # self.bus.xmpp.get_roster() - except BaseException: - logger.info("[XMPP Disconnected]: Service "+str(self.service) + - " Could not send info message to GeoServer Endpoint "+str(self._remote_wps_endpoint)) - self.bus.SendMessage( - busIndipendentMessages.LoadAverageMessage( - getloadavg_message.originator(), - outputs + self.bus.SendMessage( + busIndipendentMessages.LoadAverageMessage( + getloadavg_message.originator(), + outputs + ) ) - ) + except BaseException: + logger.info("[XMPP Disconnected]: Service "+str(self.service) + + " Could not send info message to GeoServer Endpoint "+str(self._remote_wps_endpoint)) except Exception as ex: logger.exception("Load Average initialization error", ex) @@ -244,58 +242,78 @@ def output_parser_verbose(self, invoked_process, param_filepath): gs_JID = None gs_MSG = None while True: - line = invoked_process.stdout.readline() - if line != '': - - # Look for GeoServer JID from Process - gs_UID_search = re.search('(.*)', line, re.IGNORECASE) - gs_JID_search = re.search('(.*)', line, re.IGNORECASE) - if gs_UID_search: - try: - gs_UID = gs_UID_search.group(1) - gs_JID = gs_JID_search.group(1) - gs_MSG = gs_JID_search = re.search('(.*)', line, re.IGNORECASE).group(1) - except BaseException: - pass - - if self._redirect_process_stdout_to_logger: - line = line.strip() - logger.debug("[SERVICE] " + line) - else: - logger.debug("created process " + self.service + ", PId " + - str(invoked_process.pid) + " stopped send data on stdout") - break # end of stream + try: + line = invoked_process.stdout.readline() + if line != '' and 'send error msg complete' not in line: + # Look for GeoServer JID from Process + gs_UID_search = re.search('(.*)', line, re.IGNORECASE) + gs_JID_search = re.search('(.*)', line, re.IGNORECASE) + if gs_UID_search: + try: + gs_UID = gs_UID_search.group(1) + gs_JID = gs_JID_search.group(1) + gs_MSG = gs_JID_search = re.search('(.*)', line, re.IGNORECASE).group(1) + except BaseException: + pass + + if self._redirect_process_stdout_to_logger: + line = line.strip() + logger.debug("[SERVICE] " + line) + else: + logger.debug("created process " + self.service + ", PId " + + str(invoked_process.pid) + " stopped send data on stdout") + break # end of stream + except SystemExit: + break # wait for process exit code - return_code = invoked_process.wait() + return_code = -1 + poll = invoked_process.poll() + if poll: + return_code = poll + else: + from threading import Timer + timer = Timer(10, invoked_process.kill) + try: + timer.start() + # stdout, stderr = invoked_process.communicate() + return_code = invoked_process.wait() + finally: + timer.cancel() + if return_code != 0: msg = "Process " + self.service + " PId " + \ str(invoked_process.pid) + " terminated with exit code " + str(return_code) logger.critical(msg) - logger.debug("gs_UID[%s] / gs_JID[%s]" % (gs_UID, gs_JID)) - if gs_UID and gs_JID: - self.bus.SendMessage(busIndipendentMessages.ErrorMessage( - gs_JID, msg + " Exception: " + str(gs_MSG), gs_UID)) - elif self._remote_wps_endpoint: - self.bus.SendMessage(busIndipendentMessages.ErrorMessage(self._remote_wps_endpoint, msg)) - else: - exe_msg = None - try: - logger.debug("Trying to recover Originator from Process Params!") - exe_msg = busIndipendentMessages.ExecuteMessage.deserialize(param_filepath) - if exe_msg.originator(): - self.bus.SendMessage(busIndipendentMessages. - ErrorMessage(exe_msg.originator(), - msg + - " Exception: remote process exception. Please check outputs!", - exe_msg.UniqueId())) - except BaseException: - pass - if not exe_msg: - msg = "Process " + self.service + " PId " + \ - str(invoked_process.pid) + " STALLED! Don't know who to send ERROR Message..." - logger.error(msg) + try: + if gs_UID and gs_JID: + self.bus.SendMessage(busIndipendentMessages.ErrorMessage( + gs_JID, msg + " Exception: " + str(gs_MSG), gs_UID)) + elif self._remote_wps_endpoint: + self.bus.SendMessage(busIndipendentMessages.ErrorMessage(self._remote_wps_endpoint, msg)) + else: + exe_msg = None + try: + logger.debug("Trying to recover Originator from Process Params!") + exe_msg = busIndipendentMessages.ExecuteMessage.deserialize(param_filepath) + if exe_msg.originator(): + self.bus.SendMessage(busIndipendentMessages. + ErrorMessage(exe_msg.originator(), + msg + + " Exception: remote process exception. Please check outputs!", + exe_msg.UniqueId())) + except BaseException: + pass + if not exe_msg: + msg = "Process " + self.service + " PId " + \ + str(invoked_process.pid) + " STALLED! Don't know who to send ERROR Message..." + logger.error(msg) + except BaseException: + logger.info("[XMPP Disconnected]: Service " + + str(self.service) + + " Could not send error message to GeoServer Endpoint " + + str(self._remote_wps_endpoint)) else: msg = "Process " + self.service + " PId " + str(invoked_process.pid) + " terminated successfully!" logger.debug(msg) @@ -303,21 +321,20 @@ def output_parser_verbose(self, invoked_process, param_filepath): def send_error_message(self, msg): logger = logging.getLogger("ServiceBot.send_error_message") logger.error(msg) - if self.bus.state() != 'connected': - try: + try: + if self.bus.state() != 'connected': self.bus.xmpp.reconnect() self.bus.xmpp.send_presence() - # self.bus.xmpp.get_roster() - except BaseException: - logger.info("[XMPP Disconnected]: Service " + - str(self.service) + - " Could not send error message to GeoServer Endpoint " + - str(self._remote_wps_endpoint)) - if self._remote_wps_endpoint: - self.bus.SendMessage(busIndipendentMessages.ErrorMessage(self._remote_wps_endpoint, msg)) - else: - msg = "Process " + str(self.service) + " STALLED! Don't know who to send ERROR Message..." - logger.error(msg) + if self._remote_wps_endpoint: + self.bus.SendMessage(busIndipendentMessages.ErrorMessage(self._remote_wps_endpoint, msg)) + else: + msg = "Process " + str(self.service) + " STALLED! Don't know who to send ERROR Message..." + logger.error(msg) + except BaseException: + logger.info("[XMPP Disconnected]: Service " + + str(self.service) + + " Could not send error message to GeoServer Endpoint " + + str(self._remote_wps_endpoint)) def disconnect(self): self.bus.disconnect() diff --git a/src/wpsremote/xmppMessages.py b/src/wpsremote/xmppMessages.py index edf1221..80b6278 100644 --- a/src/wpsremote/xmppMessages.py +++ b/src/wpsremote/xmppMessages.py @@ -143,10 +143,8 @@ def __init__(self, originator, xmppChannel, msg, id=None): self.id = self.xmppChannel.id def send(self): - error_json = json.dumps(self.msg) error_json_url_enc = urllib.quote(error_json) - body = ''.join(['topic=error', '&id=', self.id, "&message=", error_json_url_enc]) self.xmppChannel.xmpp.send_message(mto=self.originator, mbody=body, mtype='chat')