Skip to content

Commit

Permalink
- handle "abort" message from xmppBus
Browse files Browse the repository at this point in the history
  • Loading branch information
afabiani committed Sep 24, 2018
1 parent 8ca3698 commit a5a7a4c
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 42 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

setup(
name = "wps-remote",
version = "2.14.2",
version = "2.14.3",
author = "GeoServer Developers",
author_email = "[email protected]",
description = "A library that allows users to publish their executables as GeoServer WPS Processes through the XMPP protocol",
Expand Down
26 changes: 13 additions & 13 deletions src/wpsremote/busIndipendentMessages.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class InviteMessage(BusInipendentMessage):
def __init__(self, payload, originator):
self._originator=originator
self._payload=payload

def originator(self):
return self._originator
return self._originator

class RegisterMessage(BusInipendentMessage):

Expand All @@ -37,9 +37,9 @@ def __init__(self, originator, service, namespace, descritpion, par, output):

def input_parameters(self):
return self._input_parameter

def originator(self):
return self._originator
return self._originator

class ExecuteMessage(BusInipendentMessage):

Expand All @@ -58,11 +58,11 @@ def __init__(self, originator, uniqueExeId, baseURL, variables):
self._variables = variables

def variables(self):
return self._variables
return self._variables

def originator(self):
return self._originator
return self._originator

def UniqueId(self):
return self._uniqueExeId

Expand Down Expand Up @@ -111,18 +111,18 @@ def __init__(self, originator, msg, id=None):

class AbortMessage(BusInipendentMessage):

def __init__(self, originator, msg):
self.msg = msg
self.originator = originator
def __init__(self, payload, originator):
self.originator=originator
self.payload=payload

class GetLoadAverageMessage(BusInipendentMessage):

def __init__(self, payload, originator):
self._originator=originator
self._payload=payload

def originator(self):
return self._originator
return self._originator

class LoadAverageMessage(BusInipendentMessage):

Expand All @@ -131,4 +131,4 @@ def __init__(self, originator, outputs):
self._outputs = outputs

def outputs(self):
return self._outputs
return self._outputs
14 changes: 10 additions & 4 deletions src/wpsremote/processbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,16 @@ def handle_finish(self, finished_message):

sys.exit(0)

def handle_abort(self, aborted_message):
logger = logging.getLogger("ProcessBot.handle_abort")
logger.info("received abort mesasge from WPS")
self._finished = True
with self._lock_bus:
self.bus.disconnect()
logger.info("disconnected from communication bus")

sys.exit(-1)

def send_error_message(self, msg):
logger = logging.getLogger("ProcessBot.send_error_message to " + str(self._remote_wps_endpoint))
logger.error( msg )
Expand All @@ -360,7 +370,3 @@ def send_error_message(self, msg):
def disconnect(self):
with self._lock_bus:
self.bus.disconnect()

def handle_abort(self):
#todo
pass
33 changes: 16 additions & 17 deletions src/wpsremote/servicebot.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self, remote_config_filepath, service_config_filepath):
self.namespace = serviceConfig.get("DEFAULT", "namespace")
self.description = serviceConfig.get("DEFAULT", "description") #WPS service description
self._active = serviceConfig.get("DEFAULT", "active").lower() == "true" #True
self._output_dir = serviceConfig.get_path("DEFAULT", "output_dir")
self._output_dir = serviceConfig.get_path("DEFAULT", "output_dir")
self._max_running_time = datetime.timedelta( seconds = serviceConfig.getint("DEFAULT", "max_running_time_seconds") )

try:
Expand All @@ -67,24 +67,24 @@ def __init__(self, remote_config_filepath, service_config_filepath):

input_sections = OrderedDict()
for input_section in [s for s in serviceConfig.sections() if 'input' in s.lower() or 'const' in s.lower()]:
#service bot doesn't have yet the execution unique id, thus the serviceConfig is read with raw=True to avoid config file variables interpolation
#service bot doesn't have yet the execution unique id, thus the serviceConfig is read with raw=True to avoid config file variables interpolation
input_sections[input_section] = serviceConfig.items_without_defaults(input_section, raw=True)
self._input_parameters_defs = computation_job_inputs.ComputationJobInputs.create_from_config( input_sections )

output_sections=OrderedDict()
for output_section in [s for s in serviceConfig.sections() if 'output' in s.lower()]:
output_sections[output_section] = serviceConfig.items_without_defaults(output_section, raw=True)
self._output_parameters_defs = output_parameters.OutputParameters.create_from_config( output_sections, self._wps_execution_shared_dir )

#create the concrete bus object
self.bus = introspection.get_class_three_arg(bus_class_name, remote_config, self.service, self.namespace)

self.bus.RegisterMessageCallback(busIndipendentMessages.InviteMessage, self.handle_invite)
self.bus.RegisterMessageCallback(busIndipendentMessages.InviteMessage, self.handle_invite)
self.bus.RegisterMessageCallback(busIndipendentMessages.ExecuteMessage, self.handle_execute)

# -- Register here the callback to the "getloadavg" message
self.bus.RegisterMessageCallback(busIndipendentMessages.GetLoadAverageMessage, self.handle_getloadavg)

#self._lock_running_process = thread.allocate_lock() #critical section to access running_process from separate threads
self.running_process={}

Expand Down Expand Up @@ -115,7 +115,7 @@ def run(self):
self.bus.Listen()
else:
logger.error("This service is disabled, exit process")
return
return

def handle_invite(self, invite_message):
"""Handler for WPS invite message."""
Expand All @@ -129,11 +129,11 @@ def handle_invite(self, invite_message):
except:
logger.info( "[XMPP Disconnected]: Service "+str(self.service)+" Could not send info message to GeoServer Endpoint "+str(self._remote_wps_endpoint))
self.bus.SendMessage(
busIndipendentMessages.RegisterMessage(invite_message.originator(),
self.service,
self.namespace,
self.description,
self._input_parameters_defs.as_DLR_protocol(),
busIndipendentMessages.RegisterMessage(invite_message.originator(),
self.service,
self.namespace,
self.description,
self._input_parameters_defs.as_DLR_protocol(),
self._output_parameters_defs.as_DLR_protocol()
)
)
Expand All @@ -152,7 +152,7 @@ def handle_execute(self, execute_message):
#create the Resource Cleaner file containing the process info. The "invoked_process.pid" will be set by the spawned process itself
try:
r = resource_cleaner.Resource()
#create a resource...
#create a resource...
r.set_from_servicebot(execute_message.UniqueId(), self._output_dir / execute_message.UniqueId())
#... and save to file
logger.info("Start the resource cleaner!")
Expand All @@ -164,8 +164,8 @@ def handle_execute(self, execute_message):
cmd = 'python wpsagent.py -r ' + self._remote_config_filepath + ' -s ' + self._service_config_file + ' -p ' + param_filepath + ' process'
invoked_process = subprocess.Popen(args=cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
logger.info("created process " + self.service + " with PId " + str(invoked_process.pid) + " and cmd: " + cmd )
#use a parallel thread to wait the end of the request handler process and get the exit code of the just created asynchronous process computation

#use a parallel thread to wait the end of the request handler process and get the exit code of the just created asynchronous process computation
thread.start_new_thread(self.output_parser_verbose, (invoked_process, param_filepath,))

logger.info("end of execute message handler, going back in listening mode")
Expand Down Expand Up @@ -198,7 +198,7 @@ def handle_getloadavg(self, getloadavg_message):
logger.info( "[XMPP Disconnected]: Service "+str(self.service)+" Could not send info message to GeoServer Endpoint "+str(self._remote_wps_endpoint))
self.bus.SendMessage(
busIndipendentMessages.LoadAverageMessage(
getloadavg_message.originator(),
getloadavg_message.originator(),
outputs
)
)
Expand Down Expand Up @@ -272,7 +272,7 @@ def output_parser_verbose(self, invoked_process, param_filepath):

def send_error_message(self, msg):
logger = logging.getLogger("ServiceBot.send_error_message")
logger.error( msg )
logger.error( msg )
if self.bus.state() != 'connected':
try:
self.bus.xmpp.reconnect()
Expand All @@ -288,4 +288,3 @@ def send_error_message(self, msg):

def disconnect(self):
self.bus.disconnect()

3 changes: 3 additions & 0 deletions src/wpsremote/xmppBus.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ def Convert(self, busIndipendentMsg):
if (type(busIndipendentMsg) is busIndipendentMessages.ErrorMessage):
return xmppMessages.XMPPErrorMessage(busIndipendentMsg.originator, self, busIndipendentMsg.msg, busIndipendentMsg.id)

if (type(busIndipendentMsg) is busIndipendentMessages.AbortMessage):
return xmppMessages.XMPPErrorMessage(busIndipendentMsg.originator, self, busIndipendentMsg.msg, busIndipendentMsg.id)

if (type(busIndipendentMsg) is busIndipendentMessages.LoadAverageMessage):
return xmppMessages.XMPPLoadAverageMessage(busIndipendentMsg.originator, self, busIndipendentMsg.outputs() )

Expand Down
15 changes: 8 additions & 7 deletions src/wpsremote/xmpp_data/configs/myservice/code/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import logging.config
import logging
import argparse
import sys
import sys
import thread
import traceback
import logging
Expand Down Expand Up @@ -41,23 +41,23 @@ def run(self):
proc=subprocess.Popen(fullCmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False)
for line in proc.stdout:
self.logger.info(line)

#call communicate to retrieve return code of subprocess
proc.communicate()
ret = proc.returncode

if (ret == 0):
# zipf = zipfile.ZipFile(self.args.workdir+'/contour.zip', 'w')
# self.zipdir(self.args.workdir+'/', zipf)
output_dir = '%s/../../../output/' % os.path.dirname(os.path.abspath(__file__))
output_dir = '%s/../../../output/%s' % (os.path.dirname(os.path.abspath(__file__)), self.args.execution_id)
zipf = zipfile.ZipFile(output_dir+'/contour.zip', 'w')
self.zipdir(output_dir+'/', zipf)
zipf.close()

self.logger.info("ProgressInfo:100%")
else:
self.logger.critical("Error occurred during processing.")

return ret
# see note below
def youCanQuoteMe(self, item):
Expand All @@ -79,10 +79,11 @@ def create_logger(self, logger_config_file):
self.logger.debug("Logger initialized with file " + str(logger_config_file))

if __name__ == '__main__':

parser = argparse.ArgumentParser()
parser.add_argument("-i", "--interval", nargs='?', default="10", help="Elevation interval between contours.")
parser.add_argument("-w", "--workdir", nargs='?', default="", help="Remote process sandbox working directory.")
parser.add_argument("-e", "--execution_id", nargs='?', default="", help="Remote process Unique Execution Id.")
cmdargs = parser.parse_args()

gdalTest = GDALTest(cmdargs)
Expand Down
13 changes: 13 additions & 0 deletions src/wpsremote/xmpp_data/configs/myservice/service.config
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ input_ref = workdir
alias = w
template = -name value

[Const2]
class = const
name = execution_id
type = string
description = Remote process unique execution id
value = %(unique_execution_id)s

[Action2]
class = cmdline
input_ref = execution_id
alias = e
template = -name value

# ########################################### #
# Output Parameters Declaration #
# ########################################### #
Expand Down

0 comments on commit a5a7a4c

Please sign in to comment.