diff --git a/src/wpsremote/busIndependentMessages.py b/src/wpsremote/busIndependentMessages.py index 779f72c..f6a486f 100644 --- a/src/wpsremote/busIndependentMessages.py +++ b/src/wpsremote/busIndependentMessages.py @@ -143,3 +143,13 @@ def __init__(self, originator, outputs): def outputs(self): return self._outputs + + +class CannotExecuteMessage(BusIndependentMessage): + + def __init__(self, originator, outputs): + self.originator = originator + self._outputs = outputs + + def outputs(self): + return self._outputs diff --git a/src/wpsremote/configInstance.py b/src/wpsremote/config_instance.py similarity index 100% rename from src/wpsremote/configInstance.py rename to src/wpsremote/config_instance.py diff --git a/src/wpsremote/processbot.py b/src/wpsremote/processbot.py index 73b43af..ac050a9 100644 --- a/src/wpsremote/processbot.py +++ b/src/wpsremote/processbot.py @@ -18,7 +18,7 @@ import busIndependentMessages import computation_job_inputs import computational_job_input_actions -import configInstance +import config_instance import output_parameters import resource_cleaner from time import sleep @@ -47,7 +47,7 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess self._input_values = execute_message.variables() # read remote config - remote_config = configInstance.create(remote_config_filepath) + remote_config = config_instance.create(remote_config_filepath) bus_class_name = remote_config.get("DEFAULT", "bus_class_name") uploader_class_name = None try: @@ -66,32 +66,33 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess # the config file is read with raw=False because the unique_exe_id value # will be used (interpolated) in the config - serviceConfig = configInstance.create(service_config_filepath, - case_sensitive=True, - variables={ - 'unique_exe_id': self._uniqueExeId, - 'wps_execution_shared_dir': self._wps_execution_shared_dir - }, - raw=False) - - self.service = serviceConfig.get("DEFAULT", "service") # todo: what is? - self.namespace = serviceConfig.get("DEFAULT", "namespace") - self.description = serviceConfig.get("DEFAULT", "description") - self._active = serviceConfig.get("DEFAULT", "active").lower() == "true" # True - - self._executable_path = serviceConfig.get("DEFAULT", "executable_path") - self._executable_cmd = serviceConfig.get("DEFAULT", "executable_cmd") + service_config = config_instance.create(service_config_filepath, + case_sensitive=True, + variables={ + 'unique_exe_id': self._uniqueExeId, + 'wps_execution_shared_dir': self._wps_execution_shared_dir + }, + raw=False) + + self.service = service_config.get("DEFAULT", "service") # todo: what is? + self.namespace = service_config.get("DEFAULT", "namespace") + self.description = service_config.get("DEFAULT", "description") + self._active = service_config.get("DEFAULT", "active").lower() == "true" # True + + self._executable_path = service_config.get("DEFAULT", "executable_path") + self._executable_cmd = service_config.get("DEFAULT", "executable_cmd") if not os.path.isabs(self._executable_path): full_executable_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), self._executable_path) self._executable_cmd = self._executable_cmd.replace(self._executable_path, full_executable_path) self._executable_path = full_executable_path - self._stdout_parser = serviceConfig.get_list("Logging", "stdout_parser") - self._stdout_action = serviceConfig.get_list("Logging", "stdout_action") - self._output_dir = serviceConfig.get_path("DEFAULT", "output_dir") + self._stdout_parser = service_config.get_list("Logging", "stdout_parser") + self._stdout_action = service_config.get_list("Logging", "stdout_action") + self._output_dir = service_config.get_path("DEFAULT", "output_dir") if not os.path.isabs(self._output_dir): self._output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), self._output_dir) - self._max_running_time = datetime.timedelta(seconds=serviceConfig.getint("DEFAULT", "max_running_time_seconds")) + self._max_running_time = datetime.timedelta( + seconds=service_config.getint("DEFAULT", "max_running_time_seconds")) # create the concrete uploader object if uploader_class_name: @@ -116,19 +117,19 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess self._uploader = None input_sections = OrderedDict() - for input_section in [s for s in serviceConfig.sections() if 'input' in s.lower() or 'const' in s.lower()]: - input_sections[input_section] = serviceConfig.items_without_defaults(input_section, raw=False) + for input_section in [s for s in service_config.sections() if 'input' in s.lower() or 'const' in s.lower()]: + input_sections[input_section] = service_config.items_without_defaults(input_section, raw=False) self._input_parameters_defs = computation_job_inputs.ComputationJobInputs.create_from_config(input_sections) output_sections = OrderedDict() - for output_section in [s for s in serviceConfig.sections() if 'output' in s.lower()]: - output_sections[output_section] = serviceConfig.items_without_defaults(output_section, raw=False) + for output_section in [s for s in service_config.sections() if 'output' in s.lower()]: + output_sections[output_section] = service_config.items_without_defaults(output_section, raw=False) self._output_parameters_defs = output_parameters.OutputParameters.create_from_config( output_sections, self._wps_execution_shared_dir, self._uploader) action_sections = OrderedDict() - for action_section in [s for s in serviceConfig.sections() if 'action' in s.lower()]: - action_sections[action_section] = serviceConfig.items_without_defaults(action_section, raw=False) + for action_section in [s for s in service_config.sections() if 'action' in s.lower()]: + action_sections[action_section] = service_config.items_without_defaults(action_section, raw=False) self._input_params_actions = computational_job_input_actions.ComputationalJobInputActions.create_from_config( action_sections) diff --git a/src/wpsremote/resource_cleaner.py b/src/wpsremote/resource_cleaner.py index 2015a6d..0e77dcc 100644 --- a/src/wpsremote/resource_cleaner.py +++ b/src/wpsremote/resource_cleaner.py @@ -4,7 +4,7 @@ # This code is licensed under the GPL 2.0 license, available at the root # application directory. -import configInstance +import config_instance import os import path import unittest @@ -160,7 +160,7 @@ def read(self): self.read_from_file(self.filepath()) def read_from_file(self, filepath): - config = configInstance.create(filepath, raw=True) # todo: use file lock + config = config_instance.create(filepath, raw=True) # todo: use file lock self._start_time = config.get("DEFAULT", "start_time") try: self._start_time = datetime.datetime.strptime(self._start_time, "%Y-%m-%dT%H:%M:%S") diff --git a/src/wpsremote/resource_monitor.py b/src/wpsremote/resource_monitor.py index f9c67f0..8f18d38 100644 --- a/src/wpsremote/resource_monitor.py +++ b/src/wpsremote/resource_monitor.py @@ -16,17 +16,52 @@ logger = logging.getLogger("servicebot.resource_monitor") +class ProcessWeight(object): + + weight = 0 + coefficient = 1.0 + + def __init__(self, process_weight): + if process_weight: + self.weight = float(process_weight['weight']) + self.coefficient = float(process_weight['coefficient']) + + # ability to customize process load on per request basis + def request_weight(self, exec_request): + # this one is the default implementation + return (self.coefficient * self.weight) + + class ResourceMonitor(threading.Thread): + # Total Capacity of this machine + capacity = 100 + + # Current load + load = 0 + proc_load = 0 + resource_load = 0 + running_procs_load = {} + + load_threshold = 95 load_average_scan_minutes = 15 - cores = psutil.cpu_count() + + try: + cores = psutil.cpu_count() + except BaseException: + cores = 1 cpu_perc = [] vmem_perc = [] + lock = threading.Lock() - def __init__(self, load_average_scan_minutes): + def __init__(self, capacity, load_threshold, load_average_scan_minutes): threading.Thread.__init__(self) + + ResourceMonitor.capacity = capacity + ResourceMonitor.load_threshold = load_threshold ResourceMonitor.load_average_scan_minutes = load_average_scan_minutes + ResourceMonitor.lock.acquire() ResourceMonitor.vmem_perc.append(psutil.virtual_memory().percent) @@ -64,25 +99,47 @@ def proc_is_running(self, proc_defs): ('name' in _p and _p['name'] in name) and \ ('cwd' in _p and _p['cwd'] in path) and \ ('cmdline' in _p and _p['cmdline'] in cmdline): + ResourceMonitor.proc_load = 100 return True except BaseException: import traceback tb = traceback.format_exc() logger.debug(tb) # print(tb) + + ResourceMonitor.proc_load = 0 return False def update_stats(self): - ResourceMonitor.lock.acquire() - - ResourceMonitor.vmem_perc[1] = (ResourceMonitor.vmem_perc[0] + ResourceMonitor.vmem_perc[1]) / 2.0 - ResourceMonitor.vmem_perc[0] = (ResourceMonitor.vmem_perc[1] + psutil.virtual_memory().percent) / 2.0 - - ResourceMonitor.cpu_perc[1] = ResourceMonitor.cpu_perc[0] - ResourceMonitor.cpu_perc[0] = psutil.cpu_percent( - interval=(ResourceMonitor.load_average_scan_minutes*60), percpu=False) - - ResourceMonitor.lock.release() + try: + # Acquiring thread lock + ResourceMonitor.lock.acquire() + + # Used memory perc + ResourceMonitor.vmem_perc[1] = (ResourceMonitor.vmem_perc[0] + ResourceMonitor.vmem_perc[1]) / 2.0 + ResourceMonitor.vmem_perc[0] = (ResourceMonitor.vmem_perc[1] + psutil.virtual_memory().percent) / 2.0 + + # Used cpu perc + ResourceMonitor.cpu_perc[1] = ResourceMonitor.cpu_perc[0] + ResourceMonitor.cpu_perc[0] = psutil.cpu_percent( + interval=(ResourceMonitor.load_average_scan_minutes*60), percpu=False) + + vmem = psutil.virtual_memory().percent + if ResourceMonitor.vmem_perc[0] > 0: + vmem = (vmem + ResourceMonitor.vmem_perc[0]) / 2.0 + + loadavg = psutil.cpu_percent(interval=0, percpu=False) + if ResourceMonitor.cpu_perc[0] > 0: + loadavg = (loadavg + ResourceMonitor.cpu_perc[0]) / 2.0 + + if vmem > ResourceMonitor.load_threshold or loadavg > ResourceMonitor.load_threshold: + ResourceMonitor.resource_load = 100 + else: + ResourceMonitor.resource_load = 0 + + finally: + # Releaseing thread lock + ResourceMonitor.lock.release() def run(self): while True: diff --git a/src/wpsremote/servicebot.py b/src/wpsremote/servicebot.py index 5f15c16..80373f3 100644 --- a/src/wpsremote/servicebot.py +++ b/src/wpsremote/servicebot.py @@ -5,6 +5,7 @@ # application directory. import re +import json import psutil import thread import logging @@ -17,7 +18,7 @@ import busIndependentMessages -import configInstance +import config_instance import computation_job_inputs import output_parameters import resource_cleaner @@ -35,18 +36,56 @@ class ServiceBot(object): The script runs continuosly. """ - def __init__(self, remote_config_filepath, service_config_filepath): + def _setup_resource_monitor(self): + # read remote config file + self._remote_config = config_instance.create(self._remote_config_filepath) + + try: + self._process_blacklist = json.loads(self._remote_config.get("DEFAULT", "process_blacklist")) + except BaseException: + self._process_blacklist = [] + + try: + capacity = self._remote_config.getint("DEFAULT", "capacity") + except BaseException: + capacity = 100 + + try: + load_threshold = self._remote_config.getint("DEFAULT", "load_threshold") + except BaseException: + load_threshold = 100 + + try: + load_average_scan_minutes = self._remote_config.getint("DEFAULT", "load_average_scan_minutes") + except BaseException: + load_average_scan_minutes = 15 + resource_monitor.ResourceMonitor.capacity = capacity + resource_monitor.ResourceMonitor.load_threshold = load_threshold + resource_monitor.ResourceMonitor.load_average_scan_minutes = load_average_scan_minutes + + def _initialize_resource_monitor(self): + # Update Resource Monitoring Params from Config + self._setup_resource_monitor() + + self._resource_monitor = resource_monitor.ResourceMonitor( + resource_monitor.ResourceMonitor.capacity, + resource_monitor.ResourceMonitor.load_threshold, + resource_monitor.ResourceMonitor.load_average_scan_minutes + ) + self._resource_monitor.start() + + def __init__(self, remote_config_filepath, service_config_filepath): # read remote config file self._remote_config_filepath = remote_config_filepath - remote_config = configInstance.create(self._remote_config_filepath) + self._remote_config = config_instance.create(self._remote_config_filepath) # identify the class implementation of the cominication bus - bus_class_name = remote_config.get("DEFAULT", "bus_class_name") + bus_class_name = self._remote_config.get("DEFAULT", "bus_class_name") # directory used to store file for resource cleaner - self._resource_file_dir = remote_config.get_path("DEFAULT", "resource_file_dir") - if remote_config.has_option("DEFAULT", "wps_execution_shared_dir"): + self._resource_file_dir = self._remote_config.get_path("DEFAULT", "resource_file_dir") + if self._remote_config.has_option("DEFAULT", "wps_execution_shared_dir"): # directory used to store the process encoded outputs (usually on a shared fs) - self._wps_execution_shared_dir = remote_config.get_path("DEFAULT", "wps_execution_shared_dir") + self._wps_execution_shared_dir = self._remote_config.get_path("DEFAULT", "wps_execution_shared_dir") # ensure outdir exists if not self._wps_execution_shared_dir.exists(): @@ -59,41 +98,37 @@ def __init__(self, remote_config_filepath, service_config_filepath): # (request hanlder); for example the unique execution id value to craete # the sand box directory self._service_config_file = service_config_filepath - serviceConfig = configInstance.create(service_config_filepath, - case_sensitive=True, - variables={ - 'wps_execution_shared_dir': self._wps_execution_shared_dir - }, - raw=True) - self.service = serviceConfig.get("DEFAULT", "service") # WPS service name? - self.namespace = serviceConfig.get("DEFAULT", "namespace") - self.description = serviceConfig.get("DEFAULT", "description") # WPS service description - self._active = serviceConfig.get("DEFAULT", "active").lower() == "true" # True - self._output_dir = serviceConfig.get_path("DEFAULT", "output_dir") - self._max_running_time = datetime.timedelta(seconds=serviceConfig.getint("DEFAULT", "max_running_time_seconds")) - - try: - import json - self._process_blacklist = json.loads(serviceConfig.get("DEFAULT", "process_blacklist")) - except BaseException: - self._process_blacklist = [] - + self._service_config = config_instance.create(service_config_filepath, + case_sensitive=True, + variables={ + 'wps_execution_shared_dir': self._wps_execution_shared_dir + }, + raw=True) + self.service = self._service_config.get("DEFAULT", "service") # WPS service name? + self.namespace = self._service_config.get("DEFAULT", "namespace") + self.description = self._service_config.get("DEFAULT", "description") # WPS service description + self._active = self._service_config.get("DEFAULT", "active").lower() == "true" # True + self._output_dir = self._service_config.get_path("DEFAULT", "output_dir") + self._max_running_time = datetime.timedelta( + seconds=self._service_config.getint("DEFAULT", "max_running_time_seconds")) + + _sections = self._service_config.sections() input_sections = OrderedDict() - for input_section in [s for s in serviceConfig.sections() if 'input' in s.lower() or 'const' in s.lower()]: + for input_section in [s for s in _sections if 'input' in s.lower() or 'const' in s.lower()]: # service bot doesn't have yet the execution unique id, thus the - # serviceConfig is read with raw=True to avoid config file variables + # service_config is read with raw=True to avoid config file variables # interpolation - input_sections[input_section] = serviceConfig.items_without_defaults(input_section, raw=True) + input_sections[input_section] = self._service_config.items_without_defaults(input_section, raw=True) self._input_parameters_defs = computation_job_inputs.ComputationJobInputs.create_from_config(input_sections) output_sections = OrderedDict() - for output_section in [s for s in serviceConfig.sections() if 'output' in s.lower()]: - output_sections[output_section] = serviceConfig.items_without_defaults(output_section, raw=True) + for output_section in [s for s in self._service_config.sections() if 'output' in s.lower()]: + output_sections[output_section] = self._service_config.items_without_defaults(output_section, raw=True) self._output_parameters_defs = output_parameters.OutputParameters.create_from_config( output_sections, self._wps_execution_shared_dir) # create the concrete bus object - self.bus = introspection.get_class_three_arg(bus_class_name, remote_config, self.service, self.namespace) + self.bus = introspection.get_class_three_arg(bus_class_name, self._remote_config, self.service, self.namespace) self.bus.RegisterMessageCallback(busIndependentMessages.InviteMessage, self.handle_invite) self.bus.RegisterMessageCallback(busIndependentMessages.ExecuteMessage, self.handle_execute) @@ -110,12 +145,7 @@ def __init__(self, remote_config_filepath, service_config_filepath): self._remote_wps_endpoint = None # Allocate and start a Resource Monitoring Thread - try: - load_average_scan_minutes = serviceConfig.getint("DEFAULT", "load_average_scan_minutes") - except BaseException: - load_average_scan_minutes = 15 - self._resource_monitor = resource_monitor.ResourceMonitor(load_average_scan_minutes) - self._resource_monitor.start() + self._initialize_resource_monitor() def get_resource_file_dir(self): return self._resource_file_dir @@ -162,6 +192,17 @@ def handle_execute(self, execute_message): """Handler for WPS execute message.""" logger = logging.getLogger("servicebot.handle_execute") + # read service config, with raw=true that is without config file's value + # interpolation. Interpolation values are prodice only for the process bot + # (request hanlder); for example the unique execution id value to craete + # the sand box directory + self._service_config = config_instance.create(self._service_config_file, + case_sensitive=True, + variables={ + 'wps_execution_shared_dir': self._wps_execution_shared_dir + }, + raw=True) + # save execute messsage to tmp file to enable the process bot to read the inputs tmp_file = tempfile.NamedTemporaryFile(prefix='wps_params_', suffix=".tmp", delete=False) execute_message.serialize(tmp_file) @@ -169,30 +210,95 @@ def handle_execute(self, execute_message): tmp_file.close() logger.debug("save parameters file for executing process " + self.service + " in " + param_filepath) - # create the Resource Cleaner file containing the process info. The - # "invoked_process.pid" will be set by the spawned process itself - try: - r = resource_cleaner.Resource() - # create a resource... - r.set_from_servicebot(execute_message.UniqueId(), self._output_dir / execute_message.UniqueId()) - # ... and save to file - logger.info("Start the resource cleaner!") - r.write() - except Exception as ex: - logger.exception("Resource Cleaner initialization error", ex) + # check if there are enough resources available to keep the exec-request in charge + _can_execute = True + + # if the load average is above threshold then we cannot run another process + # and we should answer with a proper message + if self._resource_monitor.resource_load > self._resource_monitor.load_threshold: + _can_execute = False + + # if there is at least a single blacklisted process running there is no avail + # capacity to run another process and we should answer with a proper message + if self._resource_monitor.proc_load > self._resource_monitor.load_threshold: + _can_execute = False + + # compute residual capacity + _residual_capacity = self._resource_monitor.capacity - self._resource_monitor.load + if _residual_capacity <= 0: + _can_execute = False # no residual capacity left, no execution - # invoke the process bot (aka request handler) asynchronously - cmd = 'python wpsagent.py -r ' + self._remote_config_filepath + ' -s ' + \ - self._service_config_file + ' -p ' + param_filepath + ' process' - invoked_process = subprocess.Popen(args=cmd.split(), - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - logger.info("created process " + self.service + " with PId " + str(invoked_process.pid) + " and cmd: " + cmd) + # Do we have enough capacity? + PCk = None + try: + process_weight_class_name = self._service_config.get("DEFAULT", "process_weight") + PCk = introspection.get_class_no_arg(process_weight_class_name) + except BaseException: + PCk = None + if not PCk: + try: + process_weight = json.loads( + self._service_config.get("DEFAULT", "process_weight")) + except BaseException: + process_weight = {"weight": "0", "coefficient": "1.0"} + PCk = resource_monitor.ProcessWeight(process_weight) + logger.info(" ---------------Async Request Management------------------- ") + logger.info(PCk.__class__) + _request_load = PCk.request_weight(execute_message) + logger.info("Request Load: %s " % _request_load) + logger.info(" ---------------------------------------------------------- ") + if _residual_capacity < _request_load: + _can_execute = False # no residual capacity left, no execution + + if _can_execute: + # create the Resource Cleaner file containing the process info. The + # "invoked_process.pid" will be set by the spawned process itself + try: + r = resource_cleaner.Resource() + # create a resource... + r.set_from_servicebot(execute_message.UniqueId(), self._output_dir / execute_message.UniqueId()) + # ... and save to file + logger.info("Start the resource cleaner!") + r.write() + except BaseException as ex: + logger.exception("Resource Cleaner initialization error: " + str(ex)) + + # invoke the process bot (aka request handler) asynchronously + cmd = 'python wpsagent.py -r ' + self._remote_config_filepath + ' -s ' + \ + self._service_config_file + ' -p ' + param_filepath + ' process' + invoked_process = subprocess.Popen(args=cmd.split(), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + logger.info( + "created process %s with PId %s and cmd: %s" % (self.service, str(invoked_process.pid), cmd)) + + # use a parallel thread to wait the end of the request handler process and + # get the exit code of the just created asynchronous process computation + unique_id = execute_message.UniqueId() + thread.start_new_thread(self.output_parser_verbose, (unique_id, invoked_process, param_filepath,)) + + # Update the resource_monitor load + self._resource_monitor.running_procs_load[unique_id] = _request_load + self._resource_monitor.load += _request_load + else: + # Send the message back to the WPS + outputs = dict() + outputs['UniqueId'] = execute_message.UniqueId() - # use a parallel thread to wait the end of the request handler process and - # get the exit code of the just created asynchronous process computation - thread.start_new_thread(self.output_parser_verbose, (invoked_process, param_filepath,)) + try: + if self.bus.state() != 'connected': + self.bus.xmpp.reconnect() + self.bus.xmpp.send_presence() + self.bus.SendMessage( + busIndependentMessages.CannotExecuteMessage( + execute_message.originator(), + outputs + ) + ) + except BaseException: + logger.info("[XMPP Disconnected]: Service "+str(self.service) + + " Could not send info message to GeoServer Endpoint "+str(self._remote_wps_endpoint)) logger.info("end of execute message handler, going back in listening mode") @@ -200,8 +306,16 @@ def handle_getloadavg(self, getloadavg_message): """Handler for WPS 'getloadavg' message.""" logger = logging.getLogger("servicebot.handle_getloadavg") logger.info("handle getloadavg message from WPS " + str(getloadavg_message.originator())) - # Collect current Machine Load Average and Available Memory info + logger.info( + "ResourceMonitor load [%s] - proc_load [%s] - resource_load [%s]" % ( + self._resource_monitor.load, self._resource_monitor.proc_load, self._resource_monitor.resource_load) + ) + + # Update Resource Monitoring Params from Config + self._setup_resource_monitor() + + # Collect current Machine Load Average and Available Memory info try: logger.info("Fetching updated status from Resource Monitor...") @@ -240,9 +354,9 @@ def handle_getloadavg(self, getloadavg_message): logger.info("[XMPP Disconnected]: Service "+str(self.service) + " Could not send info message to GeoServer Endpoint "+str(self._remote_wps_endpoint)) except Exception as ex: - logger.exception("Load Average initialization error", ex) + logger.exception("Load Average initialization error: " + str(ex)) - def output_parser_verbose(self, invoked_process, param_filepath): + def output_parser_verbose(self, unique_id, invoked_process, param_filepath): logger = logging.getLogger("servicebot.output_parser_verbose") logger.info("wait for end of execution of created process " + self.service + ", PId " + str(invoked_process.pid)) @@ -328,6 +442,14 @@ def output_parser_verbose(self, invoked_process, param_filepath): msg = "Process " + self.service + " PId " + str(invoked_process.pid) + " terminated successfully!" logger.debug(msg) + # Update the resource_monitor load + if unique_id in self._resource_monitor.running_procs_load: + _request_load = self._resource_monitor.running_procs_load[unique_id] + self._resource_monitor.load -= _request_load + if self._resource_monitor.load < 0: + self._resource_monitor.load = 0 + del self._resource_monitor.running_procs_load[unique_id] + def send_error_message(self, msg): logger = logging.getLogger("ServiceBot.send_error_message") logger.error(msg) diff --git a/src/wpsremote/wpsagent.py b/src/wpsremote/wpsagent.py index 59d39ed..aa2643f 100644 --- a/src/wpsremote/wpsagent.py +++ b/src/wpsremote/wpsagent.py @@ -14,7 +14,7 @@ import servicebot import processbot -import configInstance +import config_instance import busIndependentMessages import resource_cleaner @@ -122,13 +122,13 @@ def __init__(self, args): # read the service config file with interpolation=true (raw=False) to get # the proper sand box work dir using the unique id as input parameter # args.remoteconfig, args.serviceconfig - serviceConfig = configInstance.create(args.serviceconfig, - case_sensitive=True, - variables={ - 'unique_exe_id': self.exe_msg.UniqueId() - }, - raw=False) - work_dir = serviceConfig.get_path("DEFAULT", "workdir") + service_config = config_instance.create(args.serviceconfig, + case_sensitive=True, + variables={ + 'unique_exe_id': self.exe_msg.UniqueId() + }, + raw=False) + work_dir = service_config.get_path("DEFAULT", "workdir") # ensure outdir exists if not work_dir.exists(): diff --git a/src/wpsremote/xmppBus.py b/src/wpsremote/xmppBus.py index b9e98e6..89de901 100644 --- a/src/wpsremote/xmppBus.py +++ b/src/wpsremote/xmppBus.py @@ -202,6 +202,11 @@ def Convert(self, busIndipendentMsg): self, busIndipendentMsg.outputs()) + if (isinstance(busIndipendentMsg, busIndependentMessages.CannotExecuteMessage)): + return xmppMessages.XMPPCannotExecuteMessage(busIndipendentMsg.originator, + self, + busIndipendentMsg.outputs()) + else: raise Exception("unknown message") diff --git a/src/wpsremote/xmppMessages.py b/src/wpsremote/xmppMessages.py index 80b6278..0fb980a 100644 --- a/src/wpsremote/xmppMessages.py +++ b/src/wpsremote/xmppMessages.py @@ -176,3 +176,22 @@ def send(self): body = ''.join(['topic=log', '&id=', self.xmppChannel.id, '&level=warning', "&message=", "Critical error while encoding LoadAverageMessage outuputs!"]) self.xmppChannel.xmpp.send_message(mto=self.originator, mbody=body, mtype='chat') + + +class XMPPCannotExecuteMessage(object): + + def __init__(self, originator, xmppChannel, outputs): + self.xmppChannel = xmppChannel + self.originator = originator + self._outputs = outputs + + def send(self): + try: + message = "output=Cannot%20Execute&exec_id=" + self._outputs['UniqueId'] + body = ''.join(['topic=cantexec', '&id=', self.xmppChannel.id, "&message=cantexec", "&", message]) + self.xmppChannel.xmpp.send_message(mto=self.originator, mbody=body, mtype='chat') + except Exception: + print traceback.format_exc() + body = ''.join(['topic=log', '&id=', self.xmppChannel.id, '&level=warning', "&message=", + "Critical error while encoding LoadAverageMessage outuputs!"]) + self.xmppChannel.xmpp.send_message(mto=self.originator, mbody=body, mtype='chat') diff --git a/src/wpsremote/xmpp_data/configs/myservice/service.config b/src/wpsremote/xmpp_data/configs/myservice/service.config index e7868ff..c6a73bd 100644 --- a/src/wpsremote/xmpp_data/configs/myservice/service.config +++ b/src/wpsremote/xmpp_data/configs/myservice/service.config @@ -24,16 +24,29 @@ workdir = %(output_dir)s/%(unique_execution_id)s active = True max_running_time_seconds = 300 -# . This option allows you to set the CPU and Memory average load scan time. -# . It is espressed in 'minutes' and if disabled here it will be set by default -# . to 15 minutes. -load_average_scan_minutes = 1 - -# . Use this option to completely avoid using this host (and prevent starting a new -# . 'processbot') whenever one of the following process names are running. -# . In other words, if one of the following processes are currently running on this machine, -# . GeoServer won't send any WPS execute request until they are finished. -process_blacklist = [{"cwd": "/mnt/d/work/code/python/geonode/geonode-master", "name": "celery", "cmdline": "-A geonode.celery_app:app worker -Q default,geonode,cleanup,update,email -B -E -l INFO"}] +# ########################################### # +# Async Request Management # +# ########################################### # + +# . Runtime estimated weight of this process. +# . if (residual_capacity >= request_load) the process won't start. +# . It is possible also to specify a more complex estimator by declaring a +# . class which overrides the "request_weight" method of the "ProcessWeight" base. +# . e.g.: process_weight = my_custom_module.MyProcessCapacity (see below) +process_weight = {"weight": "10", "coefficient": "1.5"} + +# Sample implementation +# process_weight = my_custom_module.ProcessWeightSampleImplementation +# +# On my_custom_module.py +# class ProcessWeightSampleImplementation(ProcessWeight): +# +# def __init__(self): +# pass +# +# def request_weight(self, exec_request): +# # this one is the default implementation +# return (2.5 * self.coefficient * 10.0) # ########################################### # # Inputs and Actions Declaration # diff --git a/src/wpsremote/xmpp_data/configs/myservice/service.config.sample b/src/wpsremote/xmpp_data/configs/myservice/service.config.sample index 8dcc002..3e57410 100644 --- a/src/wpsremote/xmpp_data/configs/myservice/service.config.sample +++ b/src/wpsremote/xmpp_data/configs/myservice/service.config.sample @@ -23,16 +23,29 @@ workdir = %(output_dir)s/%(unique_execution_id)s active = True max_running_time_seconds = 300 -# . This option allows you to set the CPU and Memory average load scan time. -# . It is espressed in 'minutes' and if disabled here it will be set by default -# . to 15 minutes. -load_average_scan_minutes = 1 - -# . Use this option to completely avoid using this host (and prevent starting a new -# . 'processbot') whenever one of the following process names are running. -# . In other words, if one of the following processes are currently running on this machine, -# . GeoServer won't send any WPS execute request until they are finished. -process_blacklist = [{"cwd": "/mnt/d/work/code/python/geonode/geonode-master", "name": "celery", "cmdline": "-A geonode.celery_app:app worker -Q default,geonode,cleanup,update,email -B -E -l INFO"}] +# ########################################### # +# Async Request Management # +# ########################################### # + +# . Runtime estimated weight of this process. +# . if (residual_capacity >= request_load) the process won't start. +# . It is possible also to specify a more complex estimator by declaring a +# . class which overrides the "request_weight" method of the "ProcessWeight" base. +# . e.g.: process_weight = my_custom_module.MyProcessCapacity (see below) +process_weight = {"weight": "20", "coefficient": "1.5"} + +# Sample implementation +# process_weight = my_custom_module.ProcessWeightSampleImplementation +# +# On my_custom_module.py +# class ProcessWeightSampleImplementation(ProcessWeight): +# +# def __init__(self): +# pass +# +# def request_weight(self, exec_request): +# # this one is the default implementation +# return (2.5 * self.coefficient * 10.0) # ########################################### # # Inputs and Actions Declaration # diff --git a/src/wpsremote/xmpp_data/configs/myservice/service.config.test_xml b/src/wpsremote/xmpp_data/configs/myservice/service.config.test_xml index 6453e47..7746eda 100644 --- a/src/wpsremote/xmpp_data/configs/myservice/service.config.test_xml +++ b/src/wpsremote/xmpp_data/configs/myservice/service.config.test_xml @@ -23,16 +23,29 @@ workdir = %(output_dir)s/%(unique_execution_id)s active = True max_running_time_seconds = 300 -# . This option allows you to set the CPU and Memory average load scan time. -# . It is espressed in 'minutes' and if disabled here it will be set by default -# . to 15 minutes. -load_average_scan_minutes = 1 - -# . Use this option to completely avoid using this host (and prevent starting a new -# . 'processbot') whenever one of the following process names are running. -# . In other words, if one of the following processes are currently running on this machine, -# . GeoServer won't send any WPS execute request until they are finished. -process_blacklist = [{"cwd": "/mnt/d/work/code/python/geonode/geonode-master", "name": "celery", "cmdline": "-A geonode.celery_app:app worker -Q default,geonode,cleanup,update,email -B -E -l INFO"}] +# ########################################### # +# Async Request Management # +# ########################################### # + +# . Runtime estimated weight of this process. +# . if (residual_capacity >= request_load) the process won't start. +# . It is possible also to specify a more complex estimator by declaring a +# . class which overrides the "request_weight" method of the "ProcessWeight" base. +# . e.g.: process_weight = my_custom_module.MyProcessCapacity (see below) +process_weight = {"weight": "20", "coefficient": "1.5"} + +# Sample implementation +# process_weight = my_custom_module.ProcessWeightSampleImplementation +# +# On my_custom_module.py +# class ProcessWeightSampleImplementation(ProcessWeight): +# +# def __init__(self): +# pass +# +# def request_weight(self, exec_request): +# # this one is the default implementation +# return (2.5 * self.coefficient * 10.0) # ########################################### # # Inputs and Actions Declaration # diff --git a/src/wpsremote/xmpp_data/configs/remote.config b/src/wpsremote/xmpp_data/configs/remote.config index 7ebd4fa..1af5981 100644 --- a/src/wpsremote/xmpp_data/configs/remote.config +++ b/src/wpsremote/xmpp_data/configs/remote.config @@ -61,3 +61,25 @@ resource_file_dir = ./xmpp_data/resource_dir # python decrypt.py password path/to/rsakey.pem passphrase uploader_private_rsa_key = ./xmpp_data/ssl/myTestKey.pem uploader_passphrase = f00bar + +# ########################################### # +# Async Request Management # +# ########################################### # + +# . Total capacity assigned to this machine. A value between 0 and 100. +capacity = 100 + +# . If the load average is above threshold then we cannot run another process +# . and we should answer with a proper message +load_threshold = 95 + +# . This option allows you to set the CPU and Memory average load scan time. +# . It is espressed in 'minutes' and if disabled here it will be set by default +# . to 15 minutes. +load_average_scan_minutes = 1 + +# . Use this option to completely avoid using this host (and prevent starting a new +# . 'processbot') whenever one of the following process names are running. +# . In other words, if one of the following processes are currently running on this machine, +# . GeoServer won't send any WPS execute request until they are finished. +process_blacklist = [{"cwd": "/mnt/d/work/code/python/geonode/geonode-master", "name": "celery", "cmdline": "-A geonode.celery_app:app worker -Q default,geonode,cleanup,update,email -B -E -l INFO"}] diff --git a/test/test_action.py b/test/test_action.py index 92af45f..301d854 100644 --- a/test/test_action.py +++ b/test/test_action.py @@ -7,7 +7,7 @@ import unittest import os from wpsremote.action import CopyFile, CopyINIFileAddParam -import wpsremote.configInstance as configInstance +import wpsremote.config_instance as config_instance __author__ = "Alessio Fabiani" __copyright__ = "Copyright 2019 Open Source Geospatial Foundation - all rights reserved" @@ -44,7 +44,7 @@ def test_copy_INI_file_add_param(self): self.assertFalse(os.path.isfile(copy_path)) cifap.execute(input_values) self.assertTrue(os.path.isfile(copy_path)) - config = configInstance.create(copy_path) + config = config_instance.create(copy_path) self.assertEqual("Another value", config.get("Input3", "another_param")) os.remove(copy_path) diff --git a/test/test_bot.py b/test/test_bot.py index 16967a4..8366739 100644 --- a/test/test_bot.py +++ b/test/test_bot.py @@ -15,7 +15,7 @@ from wpsremote.servicebot import ServiceBot from wpsremote.resource_cleaner import Resource import wpsremote.resource_monitor as resource_monitor -import wpsremote.configInstance as configInstance +import wpsremote.config_instance as config_instance from wpsremote.xmppBus import XMPPBus from wpsremote.computation_job_inputs import ComputationJobInputs from wpsremote.computational_job_input_actions import ComputationalJobInputActions @@ -37,7 +37,7 @@ class TestBot(unittest.TestCase): def setUp(self): os.chdir(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) - self.remote_config = configInstance.create( + self.remote_config = config_instance.create( "./src/wpsremote/xmpp_data/test/test_remote.config" ) self.xmpp_bus = XMPPBus(self.remote_config, "service_name", "service_name_namespace") diff --git a/test/test_bus_independent_messages.py b/test/test_bus_independent_messages.py index a2a0b53..7152376 100644 --- a/test/test_bus_independent_messages.py +++ b/test/test_bus_independent_messages.py @@ -8,7 +8,7 @@ import unittest import mock import pickle -import wpsremote.configInstance as configInstance +import wpsremote.config_instance as config_instance from wpsremote.busIndependentMessages import ( RegisterMessage, ProgressMessage, LogMessage, CompletedMessage, ErrorMessage, AbortMessage, LoadAverageMessage, InviteMessage, GetLoadAverageMessage, ExecuteMessage, FinishMessage @@ -30,7 +30,7 @@ class TestBusIndependentMessages(unittest.TestCase): def setUp(self): - self.remote_config = configInstance.create( + self.remote_config = config_instance.create( "./src/wpsremote/xmpp_data/test/test_remote.config" ) self.xmpp_bus = XMPPBus(self.remote_config, "service_name", "service_name_namespace") diff --git a/test/test_config_parser.py b/test/test_config_parser.py index 0bf24ed..639d2ef 100644 --- a/test/test_config_parser.py +++ b/test/test_config_parser.py @@ -7,7 +7,7 @@ import unittest import wpsremote.ConfigParser as ConfigParser from wpsremote.ConfigParser import DuplicateSectionError -import wpsremote.configInstance as configInstance +import wpsremote.config_instance as config_instance import wpsremote.path as path __author__ = "Alessio Fabiani" @@ -30,19 +30,19 @@ class TestConfigParser(unittest.TestCase): ] def test_create_config_parser(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") self.assertIsNotNone(cp) self.assertIsInstance(cp, ConfigParser.ConfigParser) def test_sections(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") sections = cp.sections() self.assertIsNotNone(sections) for s in sections: self.assertIn(s, TestConfigParser.CONFIG_SECTIONS) def test_defaults(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") default_section = cp.defaults() self.assertIsNotNone(default_section) self.assertIsNotNone(default_section.items()) @@ -50,14 +50,14 @@ def test_defaults(self): self.assertIn(d, TestConfigParser.DEFAULT_ITEMS) def test_add_section(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") self.assertRaises(DuplicateSectionError, lambda: cp.add_section("Input1")) self.assertNotIn("Input5", cp.sections()) cp.add_section("Input5") self.assertIn("Input5", cp.sections()) def test_has_section(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") self.assertTrue(cp.has_section("Input1")) def test_options(self): @@ -66,19 +66,19 @@ def test_options(self): 'executable_path', 'executable_cmd', 'output_dir', 'unique_execution_id', 'workdir', 'active', 'max_running_time_seconds', 'servicepassword', 'load_average_scan_minutes' ] - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") opts = cp.options("Action1") for o in opts: self.assertIn(o, options) def test_read(self): filenames = ["./src/wpsremote/xmpp_data/test/service.config"] - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") fn = cp.read("./src/wpsremote/xmpp_data/test/service.config") self.assertEqual(fn, filenames) def test_readfp(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") fp = open("./src/wpsremote/xmpp_data/test/service.config") cp.readfp(fp) fp.close() @@ -88,7 +88,7 @@ def test_readfp(self): self.assertNotIn(d, TestConfigParser.DEFAULT_ITEMS) def test_get(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") self.assertEqual("admin", cp.get("DEFAULT", "servicePassword")) def test_items(self): @@ -103,90 +103,90 @@ def test_items(self): ('target_filepath', 'src\\wpsremote\\xmpp_data\\test\\tmp\\123\\config.json'), ('json_path_expr', "['Config']['timeHorizon']"), ('load_average_scan_minutes', '1') ] - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") for i in cp.items("Action3"): self.assertIn(i, items_list) def test_getboolean(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") self.assertIs(True, cp.getboolean("DEFAULT", "active")) def test_has_option(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") self.assertTrue(cp.has_option("DEFAULT", "active")) def test_set(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") self.assertTrue(cp.getboolean("DEFAULT", "active")) cp.set("DEFAULT", "active", "False") self.assertFalse(cp.getboolean("DEFAULT", "active")) def test_remove_option(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") self.assertTrue(cp.has_option("Action1", "input_ref")) cp.remove_option("Action1", "input_ref") self.assertFalse(cp.has_option("Action1", "input_ref")) def test_remove_section(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") self.assertTrue(cp.has_section("Action1")) cp.remove_section("Action1") self.assertFalse(cp.has_section("Action1")) def test_items_without_defaults(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") for section in TestConfigParser.CONFIG_SECTIONS: - items = configInstance.items_without_defaults(cp, section, raw=False) + items = config_instance.items_without_defaults(cp, section, raw=False) for i in items: self.assertNotIn(i, TestConfigParser.DEFAULT_ITEMS) def test_get_list_impl(self): list_items = ['item_0', 'item_1', 'item_2'] - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") - items = configInstance.get_list_impl(cp, "Input1", "list") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") + items = config_instance.get_list_impl(cp, "Input1", "list") for i in items: self.assertIn(i, list_items) def test_get_list_list_impl(self): list_list = [['item_0'], ['item_1'], ['item_2']] - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") - items = configInstance.get_list_list_impl(cp, "Input1", "list") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") + items = config_instance.get_list_list_impl(cp, "Input1", "list") for i in items: self.assertIsInstance(i, list) self.assertIn(i, list_list) def test_get_list_int_impl(self): int_list = [0, 1, 2, 3] - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") - items = configInstance.get_list_int_impl(cp, "Input1", "int_list") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") + items = config_instance.get_list_int_impl(cp, "Input1", "int_list") for i in items: self.assertIsInstance(i, int) self.assertIn(i, int_list) def test_get_list_float_impl(self): float_list = [0.12, 1.6, 2.55, 3.4] - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") - items = configInstance.get_list_float_impl(cp, "Input1", "float_list") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") + items = config_instance.get_list_float_impl(cp, "Input1", "float_list") for i in items: self.assertIsInstance(i, float) self.assertIn(i, float_list) def test_get_list_path_impl(self): path_list = ['src\\wpsremote\\xmpp_data\\test\\tmp\\123', 'src\\wpsremote\\xmpp_data\\test\\tmp'] - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") - items = configInstance.get_list_path_impl(cp, "Input1", "path_list") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") + items = config_instance.get_list_path_impl(cp, "Input1", "path_list") for i in items: self.assertIsInstance(i, path.path) self.assertIn(i, path_list) def test_get_password(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") - psw = configInstance.get_password(cp, "DEFAULT", "servicePassword") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") + psw = config_instance.get_password(cp, "DEFAULT", "servicePassword") self.assertEqual("admin", psw) def test_get_path(self): - cp = configInstance.create("./src/wpsremote/xmpp_data/test/test_service.config") - item = configInstance.get_path(cp, "DEFAULT", "output_dir") + cp = config_instance.create("./src/wpsremote/xmpp_data/test/test_service.config") + item = config_instance.get_path(cp, "DEFAULT", "output_dir") self.assertIsInstance(item, path.path) self.assertEqual("src\\wpsremote\\xmpp_data\\test\\tmp", item)