Skip to content

Commit

Permalink
- handle "abort" message from xmppBus
Browse files Browse the repository at this point in the history
  • Loading branch information
afabiani committed Sep 24, 2018
1 parent 2eba26f commit 8ca3698
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 60 deletions.
30 changes: 15 additions & 15 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@

"""
To deploy a new version of the package on PyPi
1. Update "version"
1. Update "version"
2. Update REAMDE.md Change Log and pip install version
3. Follow the instruction at http://peterdowns.com/posts/first-time-with-pypi.html
4. Deploy on pypitest:
python setup.py sdist upload -r pypitest
python setup.py bdist --format=gztar upload -r pypitest
python setup.py bdist_wheel upload -r pypitest
5. Deploy on pypi:
python setup.py sdist upload -r pypi
python setup.py bdist --format=gztar upload -r pypi
python setup.py bdist_wheel upload -r pypi
Expand All @@ -33,7 +33,7 @@

setup(
name = "wps-remote",
version = "2.14.1",
version = "2.14.2",
author = "GeoServer Developers",
author_email = "[email protected]",
description = "A library that allows users to publish their executables as GeoServer WPS Processes through the XMPP protocol",
Expand All @@ -55,16 +55,16 @@
packages = find_packages('src'),
package_data = {
'': [
'xmpp_data/*.*',
'xmpp_data/configs/*.*',
'xmpp_data/*.*',
'xmpp_data/configs/*.*',
'xmpp_data/configs/myservice/*.*',
'xmpp_data/configs/myservice/code/*.*',
'xmpp_data/output/*.*',
'xmpp_data/resource_dir/*.*',
'xmpp_data/resource_dir/srtm_39_04/*.*',
'xmpp_data/share/placemark',
'xmpp_data/ssl/*.*',
'xmpp_data/test/*.*',
'xmpp_data/output/*.*',
'xmpp_data/resource_dir/*.*',
'xmpp_data/resource_dir/srtm_39_04/*.*',
'xmpp_data/share/placemark',
'xmpp_data/ssl/*.*',
'xmpp_data/test/*.*',
]
},
include_package_data = True,
Expand Down
32 changes: 15 additions & 17 deletions src/wpsremote/processbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@

class ProcessBot(object):
"""
This script starts when the user call a new WPS execution.
His task is to call the proper external executable/scripts according to the service.config file (provided in the cmd line with -s option) and send back to the WPS logging/progress
This script starts when the user call a new WPS execution.
His task is to call the proper external executable/scripts according to the service.config file (provided in the cmd line with -s option) and send back to the WPS logging/progress
information and error information if something unexpected happens.
All the output including the log file is generated in a sand box directory created with joint information from service.config and external process start-up information.
"""
def __init__(self, remote_config_filepath, service_config_filepath, execute_message):
self._uniqueExeId = execute_message.UniqueId()
self._remote_wps_endpoint = execute_message.originator()
self._remote_wps_baseurl = execute_message.BaseURL()
self._input_values = execute_message.variables()
self._input_values = execute_message.variables()

#read remote config
remote_config = configInstance.create(remote_config_filepath)
Expand All @@ -64,9 +64,9 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess
self._wps_execution_shared_dir.mkdir()
else:
self._wps_execution_shared_dir = None

#the config file is read with raw=False because the unique_exe_id value will be used (interpolated) in the config
serviceConfig = configInstance.create(service_config_filepath, case_sensitive=True, variables = {'unique_exe_id' : self._uniqueExeId, 'wps_execution_shared_dir' : self._wps_execution_shared_dir}, raw=False)
serviceConfig = configInstance.create(service_config_filepath, case_sensitive=True, variables = {'unique_exe_id' : self._uniqueExeId, 'wps_execution_shared_dir' : self._wps_execution_shared_dir}, raw=False)

self.service = serviceConfig.get("DEFAULT", "service") #todo: what is?
self.namespace = serviceConfig.get("DEFAULT", "namespace")
Expand Down Expand Up @@ -99,7 +99,7 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess
privatekey = open(uploader_private_rsa_key, "r")
rsa_key = RSA.importKey(privatekey, passphrase=uploader_passphrase)
uploader_password = rsa_key.decrypt(base64.b64decode(uploader_password))

self._uploader = introspection.get_class_four_arg(uploader_class_name, uploader_host, uploader_username, uploader_password, self._uniqueExeId)
else:
self._uploader = None
Expand Down Expand Up @@ -166,12 +166,12 @@ def SpawnProcess(self):

#prepare cmd line
cmd = self._executable_cmd + " " + self._input_params_actions.get_cmd_line()

#spawn the computational job process
invoked_process = subprocess.Popen(args=cmd.split(), cwd=self._executable_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False)
invoked_process = subprocess.Popen(args=cmd.split(), cwd=self._executable_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False)
logger.info("process " + self.service + " created with PId " + str(invoked_process.pid) + " and command line: " + cmd)

#read the resource file
#read the resource file
rc = resource_cleaner.Resource.create_from_file(self._uniqueExeId, os.getpid())
#add the pid of the computational job to the resource file
rc.set_from_processbot( os.getpid(), [ invoked_process.pid ] ) #todo: check if cmds contains ","!!! --> pickle?
Expand All @@ -186,7 +186,7 @@ def SpawnProcess(self):
os._exit(return_code)

#go to process output synchronuosly
self.process_output_parser( invoked_process )
self.process_output_parser( invoked_process )

def process_output_parser(self, invoked_process):
logger = logging.getLogger("ProcessBot.process_output_parser")
Expand Down Expand Up @@ -258,7 +258,7 @@ def process_output_parser(self, invoked_process):
self.bus.SendMessage( busIndipendentMessages.ErrorMessage( self._remote_wps_endpoint, res.group(2).strip() ) )
match=True
break
elif (action=="ignore"):
elif (action=="ignore"):
match=True
break
else:
Expand All @@ -270,7 +270,7 @@ def process_output_parser(self, invoked_process):
#wait for process exit code
return_code = invoked_process.wait()
logger.info( "process exit code is " + str(return_code))

if return_code==0:
#success
logger.info("process exit code is " + str(return_code) + ": success")
Expand Down Expand Up @@ -320,7 +320,7 @@ def process_output_parser(self, invoked_process):
self.send_error_message( error_message )

#self.bus.disconnect()
logger.info( "after send job-error message to WPS")
logger.info( "after send job-error message to WPS")
thread.interrupt_main()
os._exit(return_code)

Expand All @@ -336,7 +336,7 @@ def handle_finish(self, finished_message):

def send_error_message(self, msg):
logger = logging.getLogger("ProcessBot.send_error_message to " + str(self._remote_wps_endpoint))
logger.error( msg )
logger.error( msg )
with self._lock_bus:
if self.bus.state() == 'connected':
self.bus.SendMessage( busIndipendentMessages.ErrorMessage( self._remote_wps_endpoint, msg ) )
Expand All @@ -353,7 +353,7 @@ def send_error_message(self, msg):
except:
sys.stdout.write( "[XMPP Disconnected]: Process <UID>"+str(self._uniqueExeId)+"</UID> Could not send error message to GeoServer Endpoint <JID>"+str(self._remote_wps_endpoint)+"</JID> <MSG>"+msg.replace('\n', ' ').replace('\r', '')+"</MSG>")

logger.debug( "send error msg complete" )
logger.debug( "send error msg complete" )
thread.interrupt_main()
os._exit(-1)

Expand All @@ -364,5 +364,3 @@ def disconnect(self):
def handle_abort(self):
#todo
pass


2 changes: 1 addition & 1 deletion src/wpsremote/run_threads.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
python wpsagent.py -r /share/xmpp_data/configs/remote.config -s /share/xmpp_data/configs/myservice/service.config service &
python wpsagent.py -r ./xmpp_data/configs/remote.config -s ./xmpp_data/configs/myservice/service.config service &

17 changes: 10 additions & 7 deletions src/wpsremote/xmppBus.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, config, service_name, service_name_namespace, id="master"):
self.username=config.get("DEFAULT", "user")
self.password=config.get("DEFAULT", "password")
self.nameSpacePassword = config.get("DEFAULT", "mucServicePassword")

self._service_name = service_name
self._service_name_namespace = service_name_namespace
self._fully_qualified_service_name = self._service_name_namespace + "." + self._service_name
Expand Down Expand Up @@ -65,7 +65,7 @@ def _get_MUC_JId(self):
return self._service_name_namespace + "@" + self.MUC_name

def _MUC_service_nickname(self):
return '%s@%s' % (self._service_name, str(self.xmpp.boundjid).split('@', 1).pop())
return '%s@%s' % (self._service_name, str(self.xmpp.boundjid).split('@', 1).pop())


def CheckServerIdentity(self, serverId):
Expand Down Expand Up @@ -108,7 +108,7 @@ def _handleXMPPSignal(self, msg):

def CreateIndipendentMessage(self, msg):
if msg['type'] in ('normal', 'chat'):
payload = msg['body']
payload = msg['body']
origin = msg['from']

logging.info('Received XMPP bus signal from %s: "%s"' % (origin, payload))
Expand All @@ -128,13 +128,16 @@ def CreateIndipendentMessage(self, msg):
variables = dict([tuple(each.strip().split('=')) for each in payload.split('&')])
requestParams=pickle.loads(urllib.unquote(variables['message'])) if variables['message'] is not None else None
#print str(variables)
return busIndipendentMessages.ExecuteMessage( msg['from'], variables['id'], variables['baseURL'], requestParams )
return busIndipendentMessages.ExecuteMessage( msg['from'], variables['id'], variables['baseURL'], requestParams )
elif ("topic=invite" in payload):
logging.info("handle invite message from WPS " + str(payload))
return busIndipendentMessages.InviteMessage(payload, msg['from'])
elif ("topic=finish" in payload):
logging.info("handle finish message from WPS " + str(payload))
return busIndipendentMessages.FinishMessage(payload, msg['from'])
elif ("topic=abort" in payload):
logging.info("handle abort message from WPS " + str(payload))
return busIndipendentMessages.AbortMessage(payload, msg['from'])
elif ("topic=getloadavg" in payload):
return busIndipendentMessages.GetLoadAverageMessage(payload, msg['from'])
else:
Expand All @@ -154,10 +157,10 @@ def _register(self, iq):
def Convert(self, busIndipendentMsg):
if (type(busIndipendentMsg) is busIndipendentMessages.RegisterMessage):
return xmppMessages.XMPPRegisterMessage(self, busIndipendentMsg.originator(), busIndipendentMsg.service, busIndipendentMsg.namespace, busIndipendentMsg.description, busIndipendentMsg.input_parameters(), busIndipendentMsg.output)

if (type(busIndipendentMsg) is busIndipendentMessages.ProgressMessage):
return xmppMessages.XMPPProgressMessage( busIndipendentMsg.originator, self, busIndipendentMsg.progress )

if (type(busIndipendentMsg) is busIndipendentMessages.LogMessage):
return xmppMessages.XMPPLogMessage(busIndipendentMsg.originator, self, busIndipendentMsg.level, busIndipendentMsg.msg)

Expand All @@ -176,6 +179,6 @@ def Convert(self, busIndipendentMsg):

def disconnect(self):
self.xmpp.disconnect(wait=True)

def state(self):
return self.xmpp.state.current_state()
40 changes: 20 additions & 20 deletions src/wpsremote/xmpp_data/configs/myservice/service.config
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,17 @@ publish_layer_name = contour
# . Optionally it is possible to specify a root folder if the uploader class supports it.
# upload_data_root = /remote-wps/default

[Output2]
name = result2
type = application/x-netcdf
output_mime_type = application/x-netcdf
description = NetCDF Binary File
title = flexpart
filepath = %(output_dir)s/flexpart.nc
publish_as_layer = true
publish_default_style = raster
publish_target_workspace = it.geosolutions
publish_layer_name = flexpart
#[Output2]
#name = result2
#type = application/x-netcdf
#output_mime_type = application/x-netcdf
#description = NetCDF Binary File
#title = flexpart
#filepath = %(output_dir)s/flexpart.nc
#publish_as_layer = true
#publish_default_style = raster
#publish_target_workspace = it.geosolutions
#publish_layer_name = flexpart

# . Enable this option in order to perform a backup of this output
# . before sending it to GeoServer.
Expand All @@ -122,15 +122,15 @@ publish_layer_name = flexpart
# . Optionally it is possible to specify a root folder if the uploader class supports it.
# upload_data_root = /remote-wps/default

[Output3]
name = result3
type = application/owc
output_mime_type = application/xml
description = WPS OWC Json MapContext
layers_to_publish = result2
publish_as_layer = true
publish_layer_name = owc_json_ctx
publish_metadata = ./xmpp_data/resource_dir/owc_json_ctx.json
#[Output3]
#name = result3
#type = application/owc
#output_mime_type = application/xml
#description = WPS OWC Json MapContext
#layers_to_publish = result2
#publish_as_layer = true
#publish_layer_name = owc_json_ctx
#publish_metadata = ./xmpp_data/resource_dir/owc_json_ctx.json

# ########################################### #
# Logging Options Declaration #
Expand Down

0 comments on commit 8ca3698

Please sign in to comment.