-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathros_hz_monitoring.py
64 lines (48 loc) · 2.13 KB
/
ros_hz_monitoring.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""
ros_hz_monitoring.py
This script will monitor the frequency of all the
Subscribed topics and will generate warning if HZ
of any topic drop below the desired rate.
The list of all the topics will be provided in the
'topic_list' variable while desired hz values will
be recieved from ros 'Parameter Server'.
"""
#!/usr/bin/env python
import rospy
import time
import sensor_msgs.msg
from sensor_msgs.msg import PointCloud2,Imu,Image,NavSatFix
import numpy as np
import rosparam
topic_list = ['/camera/depth/image_raw_front','/iqbal/imu/data','/iqbal/navsat/fix']
class timeSync_Verification:
def __init__(self, listener_node, topic_names ):
self.listener_node = listener_node
rospy.init_node(self.listener_node)
self.avg_array = [[0 for x in range(200)] for y in range(3)]
self.topic_names = topic_names
self.first = 0.0
# Subscriber for each topic
rospy.Subscriber(topic_names[0],Image,self.handler,0)
rospy.Subscriber(topic_names[1],Imu,self.handler,1)
rospy.Subscriber(topic_names[2],NavSatFix,self.handler,2)
# spin() simply keeps python from exiting until this node is stopped
rospy.spin()
def handler(self,data,args):
second = time.time()
#second = rospy.get_time()
self.avg_array[args]= [second-self.first]+self.avg_array[args]
N = len(self.avg_array[args])
# np.convolve used for calculating moving average of hertz
freq = 1/(np.convolve(self.avg_array[args], np.ones((N,))/N, mode='valid'))[0]
# starts evaluating HZ once 'avg_array' is completely full.
if self.avg_array[args][-1] != 0.0:
desired_parameter = rosparam.get_param(self.topic_names[args]+'/timeSync')
if freq <= desired_parameter-0.1*desired_parameter or freq >= desired_parameter+0.1*desired_parameter:
print("topic: '"+str(self.topic_names[args])+"' delayed")
else:
print("on time")
self.avg_array[args].pop()
self.first = second
if __name__ == '__main__':
ts = timeSync_Verification('listener',topic_list)