diff --git a/.gitignore b/.gitignore index 83d560ed..17e5da2d 100644 --- a/.gitignore +++ b/.gitignore @@ -39,6 +39,7 @@ qtcreator-* /planning/src *~ +*.orig # Emacs .#* @@ -46,3 +47,5 @@ qtcreator-* # Catkin custom files CATKIN_IGNORE test_generated + +.idea/* \ No newline at end of file diff --git a/atf_core/src/atf_core/recorder.py b/atf_core/src/atf_core/recorder.py index 4c99c66f..287dff69 100644 --- a/atf_core/src/atf_core/recorder.py +++ b/atf_core/src/atf_core/recorder.py @@ -7,6 +7,7 @@ import os import atf_recorder_plugins import atf_core +import time from threading import Lock from atf_msgs.msg import TestblockTrigger @@ -128,8 +129,9 @@ def create_subscriber_callback(self, event): if topic not in self.subscriber: try: msg_class, _, _ = rostopic.get_topic_class(topic) - rospy.Subscriber(topic, msg_class, self.global_topic_callback, callback_args=topic) + rospy.Subscriber(topic, msg_class, self.global_topic_callback, callback_args=topic, queue_size=10) self.subscriber.append(topic) + print "new subscriber for: ", topic, "message class:", msg_class, " subscribers: ", self.subscriber except Exception: pass diff --git a/atf_metrics/config/metrics.yaml b/atf_metrics/config/metrics.yaml index 0eb8afb6..9128d860 100644 --- a/atf_metrics/config/metrics.yaml +++ b/atf_metrics/config/metrics.yaml @@ -2,13 +2,23 @@ time: handler: CalculateTimeParamHandler path_length: handler: CalculatePathLengthParamHandler -resources: - handler: CalculateResourcesParamHandler +distance: + handler: CalculateDistanceParamHandler +resource_cpu: + handler: CalculateResourcesCpuParamHandler +resource_mem: + handler: CalculateResourcesMemParamHandler +resource_io: + handler: CalculateResourcesIOParamHandler +resource_network: + handler: CalculateResourcesNetworkParamHandler obstacle_distance: handler: CalculateDistanceToObstaclesParamHandler publish_rate: handler: CalculatePublishRateParamHandler interface: - handler: CalculateInterfaceParamHandler + handler: CalculateInterfaceParamHandler +localization: + handler: CheckLocalizationParamHandler # example: # handler: ExampleParamHandler \ No newline at end of file diff --git a/atf_metrics/src/atf_metrics/__init__.py b/atf_metrics/src/atf_metrics/__init__.py index ad617882..08555650 100644 --- a/atf_metrics/src/atf_metrics/__init__.py +++ b/atf_metrics/src/atf_metrics/__init__.py @@ -1,6 +1,9 @@ from atf_metrics.calculate_path_length import CalculatePathLength, CalculatePathLengthParamHandler from atf_metrics.calculate_time import CalculateTime, CalculateTimeParamHandler -from atf_metrics.calculate_resources import CalculateResources, CalculateResourcesParamHandler +from atf_metrics.calculate_resources_cpu import CalculateResourcesCpu, CalculateResourcesCpuParamHandler +from atf_metrics.calculate_resources_mem import CalculateResourcesMem, CalculateResourcesMemParamHandler +from atf_metrics.calculate_resources_io import CalculateResourcesIO, CalculateResourcesIOParamHandler +from atf_metrics.calculate_resources_network import CalculateResourcesNetwork, CalculateResourcesNetworkParamHandler from atf_metrics.calculate_distance_to_obstacles import CalculateDistanceToObstacles, CalculateDistanceToObstaclesParamHandler from atf_metrics.calculate_publish_rate import CalculatePublishRate, CalculatePublishRateParamHandler from atf_metrics.calculate_interface import CalculateInterface, CalculateInterfaceParamHandler diff --git a/atf_metrics/src/atf_metrics/calculate_resources.py b/atf_metrics/src/atf_metrics/calculate_resources.py deleted file mode 100644 index 2cb7a02e..00000000 --- a/atf_metrics/src/atf_metrics/calculate_resources.py +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/env python -import numpy -import rospy - -from atf_msgs.msg import Resources, IO, Network - - -class CalculateResourcesParamHandler: - def __init__(self): - """ - Class for returning the corresponding metric class with the given parameter. - """ - self.params = [] - - def parse_parameter(self, params): - """ - Method that returns the metric method with the given parameter. - :param params: Parameter - """ - self.params = params - - metrics = CalculateResources(self.params) - - return metrics - - -class CalculateResources: - def __init__(self, resources): - """ - Class for calculating the average resource workload and writing the current resource data. - The resource data is sent over the topic "/testing/Resources". - :param resources: a dictionary containing the names of the resources and a list with the names of the nodes. - Example: {"cpu":[move_group], "mem": [move_group]} - :type resources: dict - """ - - self.active = False - self.resources = resources - self.node_data = {} - self.size_io = len(IO.__slots__) - self.size_network = len(Network.__slots__) - self.finished = False - - # Sort resources after nodes - for resource in self.resources: - for node in self.resources[resource]: - if node not in self.node_data: - self.node_data[node] = {resource: {"data": [], "average": [], "min": [], "max": []}} - elif resource not in self.node_data[node]: - self.node_data[node].update({resource: {"data": [], "average": [], "min": [], "max": []}}) - - rospy.Subscriber("/atf/resources", Resources, self.process_resource_data, queue_size=1) - - def start(self): - self.active = True - - def stop(self): - self.active = False - self.finished = True - - def pause(self): - self.active = False - - @staticmethod - def purge(): - pass - - def process_resource_data(self, msg): - if self.active: - for node in msg.nodes: - try: - for resource in self.node_data[node.node_name]: - if resource == "cpu": - self.node_data[node.node_name][resource]["data"].append(round(node.cpu, 2)) - elif resource == "mem": - self.node_data[node.node_name][resource]["data"].append(round(node.memory, 2)) - elif resource == "io": - if len(self.node_data[node.node_name][resource]["data"]) == 0: - for i in xrange(0, self.size_io): - self.node_data[node.node_name][resource]["data"].append([]) - self.node_data[node.node_name][resource]["data"][0].append(round(node.io.read_count, 2)) - self.node_data[node.node_name][resource]["data"][1].append(round(node.io.write_count, 2)) - self.node_data[node.node_name][resource]["data"][2].append(round(node.io.read_bytes, 2)) - self.node_data[node.node_name][resource]["data"][3].append(round(node.io.write_bytes, 2)) - elif resource == "network": - if len(self.node_data[node.node_name][resource]["data"]) == 0: - for i in xrange(0, self.size_network): - self.node_data[node.node_name][resource]["data"].append([]) - self.node_data[node.node_name][resource]["data"][0].append(round(node.network.bytes_sent, - 2)) - self.node_data[node.node_name][resource]["data"][1].append(round(node.network.bytes_recv, - 2)) - self.node_data[node.node_name][resource]["data"][2].append(round(node.network.packets_sent, - 2)) - self.node_data[node.node_name][resource]["data"][3].append(round(node.network.packets_recv, - 2)) - self.node_data[node.node_name][resource]["data"][4].append(round(node.network.errin, 2)) - self.node_data[node.node_name][resource]["data"][5].append(round(node.network.errout, 2)) - self.node_data[node.node_name][resource]["data"][6].append(round(node.network.dropin, 2)) - self.node_data[node.node_name][resource]["data"][7].append(round(node.network.dropout, 2)) - except KeyError: - pass - - def get_result(self): - if self.finished: - for node in self.node_data: - for res in self.node_data[node]: - if len(self.node_data[node][res]["data"]) != 0: - if res == "io" or res == "network": - for values in self.node_data[node][res]["data"]: - self.node_data[node][res]["average"].append(float(round(numpy.mean(values), 2))) - self.node_data[node][res]["min"].append(float(round(min(values), 2))) - self.node_data[node][res]["max"].append(float(round(max(values), 2))) - else: - self.node_data[node][res]["average"] = float(round(numpy.mean(self.node_data[node][res] - ["data"]), 2)) - self.node_data[node][res]["min"] = float(round(min(self.node_data[node][res]["data"]), 2)) - self.node_data[node][res]["max"] = float(round(max(self.node_data[node][res]["data"]), 2)) - del self.node_data[node][res]["data"] - - return "resources", self.node_data - else: - return False diff --git a/atf_metrics/src/atf_metrics/calculate_resources_cpu.py b/atf_metrics/src/atf_metrics/calculate_resources_cpu.py new file mode 100644 index 00000000..541d7ab8 --- /dev/null +++ b/atf_metrics/src/atf_metrics/calculate_resources_cpu.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python +import numpy +import rospy +import math + +from atf_msgs.msg import Resources, IO, Network + + +class CalculateResourcesCpuParamHandler: + def __init__(self): + """ + Class for returning the corresponding metric class with the given parameter. + """ + self.params = [] + + def parse_parameter(self, testblock_name, params): + """ + Method that returns the metric method with the given parameter. + :param params: Parameter + """ + if not isinstance(params, list): + rospy.logerr("metric config not a list") + return False + metrics = [] + print "params:", params + for metric in params: + # check for optional parameters + try: + groundtruth = metric["groundtruth"] + groundtruth_epsilon = metric["groundtruth_epsilon"] + print "groundtruth", groundtruth, "groundtruth_epsilon", groundtruth_epsilon + if 'groundtruth' in metric: + del metric['groundtruth'] + if 'groundtruth_epsilon' in metric: + del metric['groundtruth_epsilon'] + except (TypeError, KeyError): + rospy.logwarn( + "No groundtruth parameters given, skipping groundtruth evaluation for metric 'resources' in testblock '%s'", + testblock_name) + groundtruth = None + groundtruth_epsilon = None + print "metric:", metric + metrics.append(CalculateResourcesCpu(metric["nodes"], groundtruth, groundtruth_epsilon)) + return metrics + + +class CalculateResourcesCpu: + def __init__(self, nodes, groundtruth, groundtruth_epsilon): + """ + Class for calculating the average resource workload and writing the current resource data. + The resource data is sent over the topic "/testing/Resources". + :param resources: a dictionary containing the names of the resources and a list with the names of the nodes. + Example: {"cpu":[move_group], "mem": [move_group]} + :type resources: dict + """ + + self.active = False + self.resource = "cpu" + self.groundtruth = groundtruth + self.groundtruth_epsilon = groundtruth_epsilon + self.node_data = {} + self.size_io = len(IO.__slots__) + self.size_network = len(Network.__slots__) + self.finished = False + + # Sort resources after nodes + print "node data:", self.node_data + for node in nodes: + if node not in self.node_data: + print "node : ", node + self.node_data[node] = {self.resource: {"data": [], "average": [], "min": [], "max": []}} + # elif resource not in self.node_data[node]: + # self.node_data[node].update({resource: {"data": [], "average": [], "min": [], "max": []}}) + rospy.Subscriber("/atf/resources", Resources, self.process_resource_data, queue_size=1) + + def start(self, timestamp): + self.active = True + + def stop(self, timestamp): + self.active = False + self.finished = True + + def pause(self, timestamp): + self.active = False + + def purge(self, timestamp): + pass + + def process_resource_data(self, msg): + #print "--------------------------------------\nprocess data \n msg:", msg, "\n active", self.active + if self.active: + for node in msg.nodes: + try: + for resource in self.node_data[node.node_name]: + #print "nodes:", msg.nodes, "\n node data:", self.node_data, "\n resource", resource + if resource == "cpu": + self.node_data[node.node_name][resource]["data"].append(round(node.cpu, 2)) + except KeyError: + pass + + def get_result(self): + groundtruth_result = None + details = {"sum of nodes":[]} + average_sum = 0.0 + + if self.finished: + #print "----------------------------- \n node data:", self.node_data + for node in self.node_data: + #print " node:", node + for res in self.node_data[node]: + #print "res", res + if len(self.node_data[node][res]["data"]) != 0: + self.node_data[node][res]["average"] = float(round(numpy.mean(self.node_data[node][res] + ["data"]), 2)) + self.node_data[node][res]["min"] = float(round(min(self.node_data[node][res]["data"]), 2)) + self.node_data[node][res]["max"] = float(round(max(self.node_data[node][res]["data"]), 2)) + average_sum += float(round(numpy.mean(self.node_data[node][res]["data"]), 2)) + print "average sum:", average_sum + del self.node_data[node][res]["data"] + + details["sum of nodes"].append(node) + # details["nodes"][node].append({"max":self.node_data[node][res]["max"]}) + # details["nodes"][node].append({"average":self.node_data[node][res]["average"]}) + # details["nodes"][node].append({"min":self.node_data[node][res]["min"]}) + + #print "groundtruthes:", self.groundtruth, self.groundtruth_epsilon, "\n average:", self.node_data[node][res]["average"] + if self.groundtruth != None and self.groundtruth_epsilon != None: + for node in self.node_data: + + #print "average sum:check", average_sum + if math.fabs(self.groundtruth - average_sum) <= self.groundtruth_epsilon: + groundtruth_result = True + else: + groundtruth_result = False + + print "resources cpu data: ", average_sum, "\n groundthruth result", groundtruth_result, "details:", details, " \n .................................." + return "resources_cpu", round(average_sum, 3), groundtruth_result, self.groundtruth, self.groundtruth_epsilon, details + else: + return False diff --git a/atf_metrics/src/atf_metrics/calculate_resources_io.py b/atf_metrics/src/atf_metrics/calculate_resources_io.py new file mode 100644 index 00000000..ba9dd26d --- /dev/null +++ b/atf_metrics/src/atf_metrics/calculate_resources_io.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python +import numpy +import rospy +import math + +from atf_msgs.msg import Resources, IO, Network +from operator import add + + +class CalculateResourcesIOParamHandler: + def __init__(self): + """ + Class for returning the corresponding metric class with the given parameter. + """ + self.params = [] + + def parse_parameter(self, testblock_name, params): + """ + Method that returns the metric method with the given parameter. + :param params: Parameter + """ + if not isinstance(params, list): + rospy.logerr("metric config not a list") + return False + metrics = [] + print "params:", params + for metric in params: + # check for optional parameters + try: + groundtruth = metric["groundtruth"] + groundtruth_epsilon = metric["groundtruth_epsilon"] + print "groundtruth", groundtruth, "groundtruth_epsilon", groundtruth_epsilon + if 'groundtruth' in metric: + del metric['groundtruth'] + if 'groundtruth_epsilon' in metric: + del metric['groundtruth_epsilon'] + except (TypeError, KeyError): + rospy.logwarn( + "No groundtruth parameters given, skipping groundtruth evaluation for metric 'resources' in testblock '%s'", + testblock_name) + groundtruth = None + groundtruth_epsilon = None + print "metric:", metric + metrics.append(CalculateResourcesIO(metric["nodes"], groundtruth, groundtruth_epsilon)) + return metrics + + +class CalculateResourcesIO: + def __init__(self, nodes, groundtruth, groundtruth_epsilon): + """ + Class for calculating the average resource workload and writing the current resource data. + The resource data is sent over the topic "/testing/Resources". + :param resources: a dictionary containing the names of the resources and a list with the names of the nodes. + Example: {"cpu":[move_group], "mem": [move_group]} + :type resources: dict + """ + + self.active = False + self.resource = "io" + self.groundtruth = groundtruth + self.groundtruth_epsilon = groundtruth_epsilon + self.node_data = {} + self.size_io = len(IO.__slots__) + self.size_network = len(Network.__slots__) + self.finished = False + + # Sort resources after nodes + print "node data:", self.node_data + for node in nodes: + if node not in self.node_data: + print "node : ", node + self.node_data[node] = {self.resource: {"data": [], "average": [], "min": [], "max": []}} + # elif resource not in self.node_data[node]: + # self.node_data[node].update({resource: {"data": [], "average": [], "min": [], "max": []}}) + print "node data after:", self.node_data + rospy.Subscriber("/atf/resources", Resources, self.process_resource_data, queue_size=1) + + def start(self, timestamp): + self.active = True + + def stop(self, timestamp): + self.active = False + self.finished = True + + def pause(self, timestamp): + self.active = False + + def purge(self, timestamp): + pass + + def process_resource_data(self, msg): + #print "--------------------------------------\nprocess data \n msg:", msg, "\n active", self.active + if self.active: + for node in msg.nodes: + try: + for resource in self.node_data[node.node_name]: + #print "nodes:", msg.nodes, "\n node data:", self.node_data, "\n resource", resource + if resource == "io": + if len(self.node_data[node.node_name][resource]["data"]) == 0: + for i in xrange(0, self.size_io): + self.node_data[node.node_name][resource]["data"].append([]) + self.node_data[node.node_name][resource]["data"][0].append(round(node.io.read_count, 2)) + self.node_data[node.node_name][resource]["data"][1].append(round(node.io.write_count, 2)) + self.node_data[node.node_name][resource]["data"][2].append(round(node.io.read_bytes, 2)) + self.node_data[node.node_name][resource]["data"][3].append(round(node.io.write_bytes, 2)) + except KeyError: + pass + + def get_result(self): + groundtruth_result = None + details = {"sum of read and write bytes from nodes":[]} + average_sum = 0.0 + + if self.finished: + for node in self.node_data: + #print " node:", node + for res in self.node_data[node]: + #print "res", res + if len(self.node_data[node][res]["data"]) != 0: + self.node_data[node][res]["average"] = float(round(numpy.mean( + map(add, self.node_data[node][res]["data"][2], self.node_data[node][res]["data"][3])), 2)) + self.node_data[node][res]["min"] = float(round( + min(map(add, self.node_data[node][res]["data"][2], self.node_data[node][res]["data"][3])), + 2)) + self.node_data[node][res]["max"] = float(round( + max(map(add, self.node_data[node][res]["data"][2], self.node_data[node][res]["data"][3])), + 2)) + average_sum += float(round(numpy.mean(self.node_data[node][res]["average"]), 2)) + print "average sum:", average_sum + #print "----------------------------- \n node data:", self.node_data + del self.node_data[node][res]["data"] + + details["sum of read and write bytes from nodes"].append(node) + # details["nodes"][node].append({"max":self.node_data[node][res]["max"]}) + # details["nodes"][node].append({"average":self.node_data[node][res]["average"]}) + # details["nodes"][node].append({"min":self.node_data[node][res]["min"]}) + + #print "groundtruthes:", self.groundtruth, self.groundtruth_epsilon, "\n average:", self.node_data[node][res]["average"] + if self.groundtruth != None and self.groundtruth_epsilon != None: + for node in self.node_data: + + #print "average sum:check", average_sum + if math.fabs(self.groundtruth - average_sum) <= self.groundtruth_epsilon: + groundtruth_result = True + else: + groundtruth_result = False + + print "resources io data: ", average_sum, "\n groundthruth result", groundtruth_result, "details:", details, " \n .................................." + return "resources_io", round(average_sum, 3), groundtruth_result, self.groundtruth, self.groundtruth_epsilon, details + else: + return False diff --git a/atf_metrics/src/atf_metrics/calculate_resources_mem.py b/atf_metrics/src/atf_metrics/calculate_resources_mem.py new file mode 100644 index 00000000..235285c9 --- /dev/null +++ b/atf_metrics/src/atf_metrics/calculate_resources_mem.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python +import numpy +import rospy +import math + +from atf_msgs.msg import Resources, IO, Network + + +class CalculateResourcesMemParamHandler: + def __init__(self): + """ + Class for returning the corresponding metric class with the given parameter. + """ + self.params = [] + + def parse_parameter(self, testblock_name, params): + """ + Method that returns the metric method with the given parameter. + :param params: Parameter + """ + if not isinstance(params, list): + rospy.logerr("metric config not a list") + return False + metrics = [] + print "params:", params + for metric in params: + # check for optional parameters + try: + groundtruth = metric["groundtruth"] + groundtruth_epsilon = metric["groundtruth_epsilon"] + print "groundtruth", groundtruth, "groundtruth_epsilon", groundtruth_epsilon + if 'groundtruth' in metric: + del metric['groundtruth'] + if 'groundtruth_epsilon' in metric: + del metric['groundtruth_epsilon'] + except (TypeError, KeyError): + rospy.logwarn( + "No groundtruth parameters given, skipping groundtruth evaluation for metric 'resources' in testblock '%s'", + testblock_name) + groundtruth = None + groundtruth_epsilon = None + print "metric:", metric + metrics.append(CalculateResourcesMem(metric["nodes"], groundtruth, groundtruth_epsilon)) + return metrics + + +class CalculateResourcesMem: + def __init__(self, nodes, groundtruth, groundtruth_epsilon): + """ + Class for calculating the average resource workload and writing the current resource data. + The resource data is sent over the topic "/testing/Resources". + :param resources: a dictionary containing the names of the resources and a list with the names of the nodes. + Example: {"cpu":[move_group], "mem": [move_group]} + :type resources: dict + """ + + self.active = False + self.resource = "mem" + self.groundtruth = groundtruth + self.groundtruth_epsilon = groundtruth_epsilon + self.node_data = {} + self.size_io = len(IO.__slots__) + self.size_network = len(Network.__slots__) + self.finished = False + + # Sort resources after nodes + print "node data:", self.node_data + for node in nodes: + if node not in self.node_data: + print "node : ", node + self.node_data[node] = {self.resource: {"data": [], "average": [], "min": [], "max": []}} + # elif resource not in self.node_data[node]: + # self.node_data[node].update({resource: {"data": [], "average": [], "min": [], "max": []}}) + print "node data after:", self.node_data + rospy.Subscriber("/atf/resources", Resources, self.process_resource_data, queue_size=1) + + def start(self, timestamp): + self.active = True + + def stop(self, timestamp): + self.active = False + self.finished = True + + def pause(self, timestamp): + self.active = False + + def purge(self, timestamp): + pass + + def process_resource_data(self, msg): + #print "--------------------------------------\nprocess data \n msg:", msg, "\n active", self.active + if self.active: + for node in msg.nodes: + try: + for resource in self.node_data[node.node_name]: + #print "nodes:", msg.nodes, "\n node data:", self.node_data, "\n resource", resource + if resource == "mem": + self.node_data[node.node_name][resource]["data"].append(round(node.memory, 2)) + except KeyError: + pass + + def get_result(self): + groundtruth_result = None + details = {"sum of nodes":[]} + average_sum = 0.0 + + if self.finished: + #print "----------------------------- \n node data:", self.node_data + for node in self.node_data: + #print " node:", node + for res in self.node_data[node]: + #print "res", res + if len(self.node_data[node][res]["data"]) != 0: + self.node_data[node][res]["average"] = float(round(numpy.mean(self.node_data[node][res] + ["data"]), 2)) + self.node_data[node][res]["min"] = float(round(min(self.node_data[node][res]["data"]), 2)) + self.node_data[node][res]["max"] = float(round(max(self.node_data[node][res]["data"]), 2)) + average_sum += float(round(numpy.mean(self.node_data[node][res]["data"]), 2)) + print "average sum:", average_sum + del self.node_data[node][res]["data"] + + details["sum of nodes"].append(node) + # details["nodes"][node].append({"max":self.node_data[node][res]["max"]}) + # details["nodes"][node].append({"average":self.node_data[node][res]["average"]}) + # details["nodes"][node].append({"min":self.node_data[node][res]["min"]}) + + #print "groundtruthes:", self.groundtruth, self.groundtruth_epsilon, "\n average:", self.node_data[node][res]["average"] + if self.groundtruth != None and self.groundtruth_epsilon != None: + for node in self.node_data: + + #print "average sum:check", average_sum + if math.fabs(self.groundtruth - average_sum) <= self.groundtruth_epsilon: + groundtruth_result = True + else: + groundtruth_result = False + + print "resources mem data: ", average_sum, "\n groundthruth result", groundtruth_result, "details:", details, " \n .................................." + return "resources_mem", round(average_sum, 3), groundtruth_result, self.groundtruth, self.groundtruth_epsilon, details + else: + return False diff --git a/atf_metrics/src/atf_metrics/calculate_resources_network.py b/atf_metrics/src/atf_metrics/calculate_resources_network.py new file mode 100644 index 00000000..fbcde841 --- /dev/null +++ b/atf_metrics/src/atf_metrics/calculate_resources_network.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python +import numpy +import rospy +import math + +from atf_msgs.msg import Resources, IO, Network +from operator import add + +class CalculateResourcesNetworkParamHandler: + def __init__(self): + """ + Class for returning the corresponding metric class with the given parameter. + """ + self.params = [] + + def parse_parameter(self, testblock_name, params): + """ + Method that returns the metric method with the given parameter. + :param params: Parameter + """ + if not isinstance(params, list): + rospy.logerr("metric config not a list") + return False + metrics = [] + print "params:", params + for metric in params: + # check for optional parameters + try: + groundtruth = metric["groundtruth"] + groundtruth_epsilon = metric["groundtruth_epsilon"] + print "groundtruth", groundtruth, "groundtruth_epsilon", groundtruth_epsilon + if 'groundtruth' in metric: + del metric['groundtruth'] + if 'groundtruth_epsilon' in metric: + del metric['groundtruth_epsilon'] + except (TypeError, KeyError): + rospy.logwarn( + "No groundtruth parameters given, skipping groundtruth evaluation for metric 'resources' in testblock '%s'", + testblock_name) + groundtruth = None + groundtruth_epsilon = None + print "metric:", metric + metrics.append(CalculateResourcesNetwork(metric["nodes"], groundtruth, groundtruth_epsilon)) + return metrics + + +class CalculateResourcesNetwork: + def __init__(self, nodes, groundtruth, groundtruth_epsilon): + """ + Class for calculating the average resource workload and writing the current resource data. + The resource data is sent over the topic "/testing/Resources". + :param resources: a dictionary containing the names of the resources and a list with the names of the nodes. + Example: {"cpu":[move_group], "mem": [move_group]} + :type resources: dict + """ + + self.active = False + self.resource = "network" + self.groundtruth = groundtruth + self.groundtruth_epsilon = groundtruth_epsilon + self.node_data = {} + self.size_io = len(IO.__slots__) + self.size_network = len(Network.__slots__) + self.finished = False + + # Sort resources after nodes + print "node data:", self.node_data + for node in nodes: + if node not in self.node_data: + print "node : ", node + self.node_data[node] = {self.resource: {"data": [], "average": [], "min": [], "max": []}} + # elif resource not in self.node_data[node]: + # self.node_data[node].update({resource: {"data": [], "average": [], "min": [], "max": []}}) + print "node data after:", self.node_data + rospy.Subscriber("/atf/resources", Resources, self.process_resource_data, queue_size=1) + + def start(self, timestamp): + self.active = True + + def stop(self, timestamp): + self.active = False + self.finished = True + + def pause(self, timestamp): + self.active = False + + def purge(self, timestamp): + pass + + def process_resource_data(self, msg): + #print "--------------------------------------\nprocess data \n msg:", msg, "\n active", self.active + if self.active: + for node in msg.nodes: + try: + for resource in self.node_data[node.node_name]: + #print "nodes:", msg.nodes, "\n node data:", self.node_data, "\n resource", resource + if resource == "network": + if len(self.node_data[node.node_name][resource]["data"]) == 0: + for i in xrange(0, self.size_network): + self.node_data[node.node_name][resource]["data"].append([]) + self.node_data[node.node_name][resource]["data"][0].append(round(node.network.bytes_sent, + 2)) + self.node_data[node.node_name][resource]["data"][1].append(round(node.network.bytes_recv, + 2)) + self.node_data[node.node_name][resource]["data"][2].append(round(node.network.packets_sent, + 2)) + self.node_data[node.node_name][resource]["data"][3].append(round(node.network.packets_recv, + 2)) + self.node_data[node.node_name][resource]["data"][4].append(round(node.network.errin, 2)) + self.node_data[node.node_name][resource]["data"][5].append(round(node.network.errout, 2)) + self.node_data[node.node_name][resource]["data"][6].append(round(node.network.dropin, 2)) + self.node_data[node.node_name][resource]["data"][7].append(round(node.network.dropout, 2)) + except KeyError: + pass + + def get_result(self): + groundtruth_result = None + details = {"sum of received and sent bytes from nodes":[]} + average_sum = 0.0 + + if self.finished: + #print "----------------------------- \n node data:", self.node_data + for node in self.node_data: + #print " node:", node + for res in self.node_data[node]: + #print "res", res + if len(self.node_data[node][res]["data"]) != 0: + self.node_data[node][res]["average"] = float(round(numpy.mean( + map(add, self.node_data[node][res]["data"][0], self.node_data[node][res]["data"][1])), 2)) + self.node_data[node][res]["min"] = float(round( + min(map(add, self.node_data[node][res]["data"][0], self.node_data[node][res]["data"][1])), + 2)) + self.node_data[node][res]["max"] = float(round( + max(map(add, self.node_data[node][res]["data"][0], self.node_data[node][res]["data"][1])), + 2)) + average_sum += float(round(numpy.mean(self.node_data[node][res]["average"]), 2)) + print "average sum:", average_sum + del self.node_data[node][res]["data"] + + details["sum of received and sent bytes from nodes"].append(node) + # details["nodes"][node].append({"max":self.node_data[node][res]["max"]}) + # details["nodes"][node].append({"average":self.node_data[node][res]["average"]}) + # details["nodes"][node].append({"min":self.node_data[node][res]["min"]}) + + #print "groundtruthes:", self.groundtruth, self.groundtruth_epsilon, "\n average:", self.node_data[node][res]["average"] + if self.groundtruth != None and self.groundtruth_epsilon != None: + for node in self.node_data: + + #print "average sum:check", average_sum + if math.fabs(self.groundtruth - average_sum) <= self.groundtruth_epsilon: + groundtruth_result = True + else: + groundtruth_result = False + + print "resources network data: ", average_sum, "\n groundthruth result", groundtruth_result, "details:", details, " \n .................................." + return "resources_network", round(average_sum, 3), groundtruth_result, self.groundtruth, self.groundtruth_epsilon, details + else: + return False diff --git a/atf_metrics/src/atf_metrics/check_localization.py b/atf_metrics/src/atf_metrics/check_localization.py new file mode 100644 index 00000000..ab5a7ef4 --- /dev/null +++ b/atf_metrics/src/atf_metrics/check_localization.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python +import rospy +import tf +import math + +class CheckLocalizationParamHandler: + def __init__(self): + """ + Class for returning the corresponding metric class with the given parameter. + """ + pass + + def parse_parameter(self, testblock_name, params): + """ + Method that returns the metric method with the given parameter. + :param params: Parameter + """ + metrics = [] + if type(params) is not list: + rospy.logerr("metric config not a list") + return False + + for metric in params: + # check for optional parameters + try: + groundtruth = metric["groundtruth"] + groundtruth_epsilon = metric["groundtruth_epsilon"] + except (TypeError, KeyError): + rospy.logwarn("No groundtruth parameters given, skipping groundtruth evaluation for metric 'distance' in testblock '%s'", testblock_name) + groundtruth = None + groundtruth_epsilon = None + metrics.append(CheckLocalization(metric["root_frame"], metric["measured_frame"], metric["max_loc_error"], groundtruth, groundtruth_epsilon)) + return metrics + +class CheckLocalization: + def __init__(self, root_frame, measured_frame, max_loc_error, groundtruth, groundtruth_epsilon): + """ + Class for calculating the distance to a given root frame. + The tf data is sent over the tf topic given in the robot_config.yaml. + :param root_frame: name of the first frame + :type root_frame: string + :param measured_frame: name of the second frame. The distance will be measured in relation to the root_frame. + :type measured_frame: string + """ + + self.active = False + self.root_frame = root_frame + self.measured_frame = measured_frame + self.distance = 0.0 + self.lost_count = 0 + self.tf_sampling_freq = 1.0 # Hz + self.groundtruth = groundtruth + self.groundtruth_epsilon = groundtruth_epsilon + self.max_loc_error = max_loc_error; + self.finished = False + + self.listener = tf.TransformListener() + + rospy.Timer(rospy.Duration.from_sec(1 / self.tf_sampling_freq), self.record_tf) + + def start(self, timestamp): + self.active = True + + def stop(self, timestamp): + self.active = False + self.finished = True + + def pause(self, timestamp): + self.active = False + self.first_value = True + + def purge(self, timestamp): + pass + + def record_tf(self, event): + if self.active: + try: + self.listener.waitForTransform(self.root_frame, + self.measured_frame, + rospy.Time(0), + rospy.Duration.from_sec(1 / (2*self.tf_sampling_freq))) + (trans, rot) = self.listener.lookupTransform(self.root_frame, self.measured_frame, rospy.Time(0)) + except (tf.Exception, tf.LookupException, tf.ConnectivityException): + #rospy.logwarn(e) + pass + else: + self.distance = math.sqrt(trans[0]**2 + trans[1]**2) + if (self.distance > self.max_loc_error): + self.lost_count += 1 + + + def get_result(self): + groundtruth_result = None + details = {"root_frame": self.root_frame, "measured_frame": self.measured_frame} + if self.finished: + data = self.lost_count + if self.groundtruth != None and self.groundtruth_epsilon != None: + if math.fabs(self.groundtruth - data) <= self.groundtruth_epsilon: + groundtruth_result = True + else: + groundtruth_result = False + return "localization", data, groundtruth_result, self.groundtruth, self.groundtruth_epsilon, details + else: + return False diff --git a/atf_recorder_plugins/config/recorder_plugins.yaml b/atf_recorder_plugins/config/recorder_plugins.yaml index e85989bc..dd2e1b7a 100644 --- a/atf_recorder_plugins/config/recorder_plugins.yaml +++ b/atf_recorder_plugins/config/recorder_plugins.yaml @@ -1,4 +1,7 @@ -#resources: RecordResources +resources_cpu: RecordResources +resource_mem: RecordResources +resource_io: RecordResources +resource_network: RecordResources interface: RecordInterface #obstacle_distance: RecordObstacleDistance # example: Example diff --git a/atf_recorder_plugins/src/atf_recorder_plugins/__init__.py b/atf_recorder_plugins/src/atf_recorder_plugins/__init__.py index d54ff25b..8a6deeef 100644 --- a/atf_recorder_plugins/src/atf_recorder_plugins/__init__.py +++ b/atf_recorder_plugins/src/atf_recorder_plugins/__init__.py @@ -1,4 +1,4 @@ -#from atf_recorder_plugins.plugin_resources import RecordResources +from atf_recorder_plugins.plugin_resources import RecordResources from atf_recorder_plugins.plugin_interface import RecordInterface #from atf_recorder_plugins.plugin_obstacle_distance import RecordObstacleDistance # from atf_recorder_plugins.example import Example diff --git a/atf_recorder_plugins/src/atf_recorder_plugins/plugin_resources.py b/atf_recorder_plugins/src/atf_recorder_plugins/plugin_resources.py index abfc64d8..6c49309d 100755 --- a/atf_recorder_plugins/src/atf_recorder_plugins/plugin_resources.py +++ b/atf_recorder_plugins/src/atf_recorder_plugins/plugin_resources.py @@ -4,141 +4,152 @@ import time import xmlrpclib import rosnode +import yaml -from copy import copy +from copy import copy, deepcopy from re import findall from subprocess import check_output, CalledProcessError -from atf_msgs.msg import Resources, IO, Network -from atf_recorder import BagfileWriter - +from atf_msgs.msg import NodeResources, Resources, IO, Network, TestblockTrigger class RecordResources: - def __init__(self, topic_prefix, config_file, robot_config_file, write_lock, bag_file): - self.topic_prefix = topic_prefix - self.test_config = config_file + def __init__(self, write_lock, bag_file_writer): + self.topic_prefix = "atf/" + file = "/home/fmw-hb/atf_catkin_ws/src/atf/hannes_test/config/test_configs/test1.yaml" + with open(file, 'r') as stream: + self.test_config = yaml.load(stream) - self.resources_timer_frequency = 4.0 # Hz + self.resources_timer_frequency = 10.0 # Hz self.timer_interval = 1/self.resources_timer_frequency self.testblock_list = self.create_testblock_list() self.pid_list = self.create_pid_list() - self.requested_nodes = {} + self.requested_nodes = [] self.res_pipeline = {} - self.BfW = BagfileWriter(bag_file, write_lock) + self.BfW = bag_file_writer rospy.Timer(rospy.Duration.from_sec(self.timer_interval), self.collect_resource_data) def update_requested_nodes(self, msg): + counter = 0 + requested_nodes = [] + if msg.trigger == TestblockTrigger.START: + print "START Trigger" - if msg.trigger.trigger == Trigger.ACTIVATE: - for node in self.testblock_list[msg.name]: - if node not in self.requested_nodes: - self.requested_nodes[node] = copy(self.testblock_list[msg.name][node]) - self.res_pipeline[node] = copy(self.testblock_list[msg.name][node]) - else: - for res in self.testblock_list[msg.name][node]: - self.requested_nodes[node].append(res) - if res not in self.res_pipeline[node]: - self.res_pipeline[node].append(res) - - elif msg.trigger.trigger == Trigger.FINISH: for node in self.testblock_list[msg.name]: - for res in self.testblock_list[msg.name][node]: - self.requested_nodes[node].remove(res) - if res not in self.requested_nodes[node]: - self.res_pipeline[node].remove(res) - if len(self.requested_nodes[node]) == 0: - del self.requested_nodes[node] - del self.res_pipeline[node] + print "node:", node + if not node in requested_nodes: + requested_nodes.append(node) + #self.res_pipeline[resource].extend(node_name) + self.requested_nodes = deepcopy(requested_nodes) + #print "requested nodes:", self.requested_nodes + #print "res pipeline:", self.res_pipeline + counter += 1 + + elif msg.trigger == TestblockTrigger.STOP: + print "STOP Trigger" def create_testblock_list(self): - testblock_list = {} + node_list = [] + counter = 0 + print "testconfig: ", self.test_config for testblock in self.test_config: + #print "testblock:", testblock, "\n tests:", self.test_config[testblock] try: - self.test_config[testblock]["resources"] + self.test_config[testblock] except KeyError: + rospy.logerr("No nodes for resources to record") continue else: - for resource in self.test_config[testblock]["resources"]: - try: - testblock_list[testblock] - except KeyError: - testblock_list.update({testblock: {}}) - - for node_name in self.test_config[testblock]["resources"][resource]: - - if node_name not in testblock_list[testblock]: - testblock_list[testblock].update({node_name: [resource]}) - elif resource not in testblock_list[testblock][node_name]: - testblock_list[testblock][node_name].append(resource) - + for resource, nodes in self.test_config[testblock].iteritems(): + if str(resource).__contains__("resource"): + #print "resources:", resource, "nodes:", nodes[counter]["nodes"] + node_list.extend(nodes[counter]["nodes"]) + + # if 'groundtruth' in resource: + # del resource['groundtruth'] + # if 'groundtruth_epsilon' in resource: + # del resource['groundtruth_epsilon'] + counter += 1 + try: + testblock_list[testblock] + except KeyError: + testblock_list.update({testblock: []}) + #print "node list:", node_list + testblock_list.update({testblock: node_list}) + #print "--------------------------\n testblock list:", testblock_list return testblock_list def collect_resource_data(self, event): - pipeline = copy(self.res_pipeline) - if not len(pipeline) == 0: - msg = Resources() - topic = self.topic_prefix + "resources" - - for node in pipeline: - msg_data = NodeResources() - pid = self.pid_list[node] - - if pid is None: - continue - - try: - msg_data.node_name = node - - if "cpu" in pipeline[node]: - msg_data.cpu = psutil.Process(pid).cpu_percent(interval=self.timer_interval) - - if "mem" in pipeline[node]: - msg_data.memory = psutil.Process(pid).memory_percent() - - if "io" in pipeline[node]: - data = findall('\d+', str(psutil.Process(pid).io_counters())) - msg_data.io.read_count = int(data[0]) - msg_data.io.write_count = int(data[1]) - msg_data.io.read_bytes = int(data[2]) - msg_data.io.write_bytes = int(data[3]) - - if "network" in pipeline[node]: - data = findall('\d+', str(psutil.net_io_counters())) - msg_data.network.bytes_sent = int(data[0]) - msg_data.network.bytes_recv = int(data[1]) - msg_data.network.packets_sent = int(data[2]) - msg_data.network.packets_recv = int(data[3]) - msg_data.network.errin = int(data[4]) - msg_data.network.errout = int(data[5]) - msg_data.network.dropin = int(data[6]) - msg_data.network.dropout = int(data[7]) - - msg.nodes.append(msg_data) - except (psutil.NoSuchProcess, psutil.AccessDenied): - pass - - self.BfW.write_to_bagfile(topic, msg, rospy.Time.from_sec(time.time())) + msg = Resources() + msg_list = [] + topic = self.topic_prefix + "resources" + #for resource, nodes in pipeline.iteritems(): + msg_data = NodeResources() + #print "pid list: ", self.pid_list#, "pid", self.pid_list[resource] + for node, pid in self.pid_list.iteritems(): + if pid is None: + continue + #print "requested nodes: ", self.requested_nodes + #print "message node:", node, "pid:", pid + try: + msg_data.node_name = node + #print "node:", node, "pid:", pid + + msg_data.cpu = psutil.Process(pid).get_cpu_percent(interval=self.timer_interval) + + msg_data.memory = psutil.Process(pid).get_memory_percent() + + data = findall('\d+', str(psutil.Process(pid).get_io_counters())) + msg_data.io.read_count = int(data[0]) + msg_data.io.write_count = int(data[1]) + msg_data.io.read_bytes = int(data[2]) + msg_data.io.write_bytes = int(data[3]) + + data = findall('\d+', str(psutil.net_io_counters())) + msg_data.network.bytes_sent = int(data[0]) + msg_data.network.bytes_recv = int(data[1]) + msg_data.network.packets_sent = int(data[2]) + msg_data.network.packets_recv = int(data[3]) + msg_data.network.errin = int(data[4]) + msg_data.network.errout = int(data[5]) + msg_data.network.dropin = int(data[6]) + msg_data.network.dropout = int(data[7]) + + #print "message data: ", msg_data + msg_list.append(copy(msg_data)) + #print "message list: ", msg_list + except (psutil.NoSuchProcess, psutil.AccessDenied) as e: + rospy.logerr("collecting error: %s", e) + pass + msg.nodes = msg_list + #print "resource msg:", msg + self.BfW.write_to_bagfile(topic, msg, rospy.Time.now()) def trigger_callback(self, msg): # Only save node resources if testblock requests them + #print "trigger callback: msg \n", msg, " \n testblocks", self.testblock_list, "\n msg trigger:", msg.trigger if msg.name in self.testblock_list: self.update_requested_nodes(msg) - if msg.trigger.trigger == Trigger.ERROR: - self.res_pipeline = [] def create_pid_list(self): node_list = {} + pid_list = {} for (testblock, nodes) in self.testblock_list.iteritems(): for node in nodes: - if node not in node_list: - node_list[node] = self.get_pid(node) - - return node_list + #for resource, names in node.iteritems(): + #print "node: ", node,"nodes: ", nodes, "node_list:", node_list + #if isinstance(names, list): + # for name in names: + if self.get_pid(node) not in pid_list: + pid_list.update({node:self.get_pid(node)}) + print "pid", self.get_pid(node), "for node", node + #node_list.update({resource:pid_list}) + #print "pid list", pid_list + return pid_list @staticmethod def get_pid(name):