14
14
import sys
15
15
import time
16
16
import math
17
+ from distutils .util import strtobool
18
+ from py_expression_eval import Parser
17
19
18
20
from pybennu .distributed .subscriber import Subscriber
19
21
from pybennu .distributed .client import Client
20
22
import pybennu .distributed .swig ._Endpoint as E
21
23
24
+ #Adding a timeout helper to cause client objects not to feeze program
25
+ import signal
26
+ from contextlib import contextmanager
27
+
28
+
29
+ @contextmanager
30
+ def timeout (time ):
31
+ # Register a function to raise a TimeoutError on the signal.
32
+ signal .signal (signal .SIGALRM , raise_timeout )
33
+ # Schedule the signal to be sent after ``time``
34
+ signal .alarm (time )
35
+
36
+ try :
37
+ yield
38
+ except TimeoutError :
39
+ pass
40
+ finally :
41
+ # Unregister the signal so it won't be triggered
42
+ # if the timeout is not reached.
43
+ signal .signal (signal .SIGALRM , signal .SIG_IGN )
44
+
45
+
46
+ def raise_timeout (signum , frame ):
47
+ raise TimeoutError
48
+
22
49
logging .basicConfig (level = logging .DEBUG ,format = '%(asctime)s - %(levelname)s - %(message)s' )
23
50
logger = logging .getLogger ('alicanto' )
24
51
#logger.addHandler(logging.StreamHandler())
@@ -34,24 +61,31 @@ class alicantoClient(Client):
34
61
def __init__ (self , end_dest ):
35
62
new_endpoint_dest = E .new_Endpoint ()
36
63
E .Endpoint_str_set (new_endpoint_dest , 'tcp://' + str (end_dest ))
37
- self .endpointName = 'tcp://' + str (end_dest )
38
64
Client .__init__ (self , new_endpoint_dest )
39
-
65
+
40
66
def send (self , message ):
41
67
""" Send message to Provider
42
68
"""
43
- # send update
44
- self ._Client__socket .send_string (message + '\0 ' ) # must include null byte
45
- # get response
46
- msg = self ._Client__socket .recv_string ()
47
- reply = msg .split ('=' )
48
- status = reply [0 ]
49
- data = reply [1 ]
69
+ with timeout (10 ):
70
+ self .connect ()
71
+ # send update
72
+ self ._Client__socket .send_string (message + '\0 ' ) # must include null byte
73
+ # get response
74
+ msg = self ._Client__socket .recv_string ()
75
+ reply = msg .split ('=' )
76
+ status = reply [0 ]
77
+ data = reply [1 ]
78
+
79
+ if status == self ._Client__kACK :
80
+ print ("I: ACK: " + data )
81
+ #self.reply_handler(data)
82
+ else :
83
+ print ("I: ERR -- %s" % msg )
50
84
51
- if status != self ._Client__kACK :
52
- logger . error ( msg )
85
+ self ._Client__socket . close ()
86
+ self . _Client__context . term ( )
53
87
54
- return reply
88
+ return reply
55
89
56
90
class alicanto ():
57
91
def __init__ (self , config , debug = False , exit_handler = None ):
@@ -72,6 +106,9 @@ def __init__(self, config, debug=False, exit_handler=None):
72
106
self .dests = {}
73
107
# Tag=>type map
74
108
self .types = {}
109
+ self .logic = {}
110
+ # Expression parser for logic
111
+ self .parser = Parser ()
75
112
# Set of all tags
76
113
self .tags = {}
77
114
@@ -103,7 +140,7 @@ def __init__(self, config, debug=False, exit_handler=None):
103
140
end_name = endpoint ["name" ]
104
141
end_destination = endpoint ["destination" ]
105
142
end_type = endpoint ["type" ]
106
- logger .debug (f"Registered endpoint ---> end_name: { end_name } ---> end_destination: { end_destination } " )
143
+ logger .info (f"Registered endpoint ---> end_name: { end_name } ---> end_destination: { end_destination } " )
107
144
self .tags .update ({end_destination : 0 })
108
145
self .end_dests .append (end_destination )
109
146
self .dests [end_name ] = end_destination
@@ -124,24 +161,23 @@ def __init__(self, config, debug=False, exit_handler=None):
124
161
sub_info = self .subid [i ]["info" ] # stores logic for interdependencies
125
162
except :
126
163
sub_info = None
127
- logger .debug (f"Registered subscription ---> sub_name: { sub_name } ---> sub_type: { sub_type } ---> sub_info: { sub_info } " )
128
- #sub_name = sub_name.split('/')[1] if '/' in sub_name else sub_name
164
+ logger .info (f"Registered subscription ---> sub_name: { sub_name } ---> sub_type: { sub_type } ---> sub_info: { sub_info } " )
129
165
self .tags .update ({sub_name : 0 })
130
166
self .types [sub_name ] = sub_type
131
167
if sub_info :
132
- logger .debug (f"********** LOGIC **********" )
168
+ logger .info (f"********** LOGIC **********" )
133
169
for exp in sub_info .split (';' ):
134
170
lhs , rhs = exp .split ('=' )
135
171
self .logic [lhs .strip ()] = rhs .strip ()
136
- logger .debug (f'{ exp .strip ()} ' )
172
+ logger .info (f'{ exp .strip ()} ' )
137
173
#make sub_sources elements unique
138
174
self .sub_sources = list (set (self .sub_sources ))
139
175
140
176
for tag in self .tags :
141
177
self .state [tag ] = False if self .get_type (tag ) == 'bool' else 0
142
178
143
179
for sub_source in self .sub_sources :
144
- logger .debug (f"Launching Subscriber Thread ---> subscription: udp://{ sub_source } " )
180
+ logger .info (f"Launching Subscriber Thread ---> subscription: udp://{ sub_source } " )
145
181
subber = alicantoSubscriber (sub_source )
146
182
subber .subscription_handler = self ._subscription_handler
147
183
self .__sub_thread = threading .Thread (target = subber .run )
@@ -153,17 +189,45 @@ def __init__(self, config, debug=False, exit_handler=None):
153
189
for end_dest in self .end_dests :
154
190
# Initialize bennu Client
155
191
end_dest = end_dest .split ('/' )[0 ]
156
- self .end_clients [end_dest ] = alicantoClient (end_dest )
192
+ try :
193
+ self .end_clients [end_dest ] = alicantoClient (end_dest )
194
+ except :
195
+ logger .error (f"\t Error Initializing Client: { self .end_clients } " )
157
196
for key in list (self .end_clients .keys ()):
158
- logger .debug (f"End_client: { key } " )
197
+ logger .info (f"End_client: { key } " )
159
198
160
199
def run (self ):
161
200
############## Entering Execution Mode ##############################
162
201
logger .info ("Entered alicanto execution mode" )
202
+ # Endpoint initial values to alicanto
203
+ for i in range (self .end_count ):
204
+ full_end_name = self .endid [i ]["name" ]
205
+ end_name = (full_end_name .split ('/' )[1 ]
206
+ if '/' in full_end_name
207
+ else full_end_name )
208
+ full_end_dest = self .endid [i ]["destination" ]
209
+ end_dest = (full_end_dest .split ('/' )[0 ]
210
+ if '/' in full_end_dest
211
+ else full_end_dest )
212
+ end_dest_tag = (full_end_dest .split ('/' )[1 ]
213
+ if '/' in full_end_dest
214
+ else full_end_dest )
215
+ try :
216
+ self .end_clients [end_dest ] = alicantoClient (end_dest )
217
+ reply = self .end_clients [end_dest ].send ("READ=" + end_dest_tag )
218
+ value = reply [1 ].rstrip ('\x00 ' )
219
+ self .endid [i ]["value" ] = value
220
+ self .tag (full_end_dest , value )
221
+ logger .debug (f"Initial Endpoints { end_name } / { end_dest } :{ value } " )
222
+
223
+ except :
224
+ logger .error (f"\t Error Initializing Client: { self .end_clients } " )
225
+ continue
163
226
164
227
########## Main co-simulation loop ####################################
165
228
while True :
166
229
self .publish_state ()
230
+ time .sleep (0.1 )
167
231
for key , value in self .endid .items ():
168
232
full_end_name = value ["name" ]
169
233
end_name = (full_end_name .split ('/' )[1 ]
@@ -177,25 +241,95 @@ def run(self):
177
241
if '/' in full_end_dest
178
242
else full_end_dest )
179
243
180
- # !!need to add something to handle binary points
181
244
if self .types [full_end_name ] == 'float' or self .types [full_end_name ] == 'double' :
182
245
if not math .isclose (float (self .tag (full_end_name )), float (self .tag (full_end_dest ))):
183
- self .end_clients [end_dest ].write_analog_point (end_dest_tag , self .tag (full_end_name ))
184
- reply = self .end_clients [end_dest ].send ("READ=" + end_name )
185
- value = reply [1 ].rstrip ('\x00 ' )
186
- self .tag (full_end_dest , value )
246
+ #Handle Logic
247
+ if self .logic [full_end_dest ] is not None :
248
+ expr = self .parser .parse (self .logic [full_end_dest ])
249
+ '''
250
+ # Assign variables
251
+ vars = {}
252
+ for var in expr.variables():
253
+ vars[var] = self.tag(var)
254
+ '''
255
+ i = 0
256
+ # Assign vars not working, so assign token manually
257
+ for token in expr .tokens :
258
+ for search_tag in self .tags :
259
+ if token .toString () == search_tag :
260
+ expr .tokens [i ].number_ = self .tag (token .toString ())
261
+ i += 1
262
+ # Evaluate expression
263
+ value = expr .evaluate (vars )
264
+ value = str (value ).lower ()
265
+ if value != self .tag (full_end_dest ):
266
+ logger .debug (f"\t LOGIC: { full_end_dest .strip ()} ={ self .logic [full_end_dest ]} ----> { value } " )
267
+ # Assign new tag value
268
+ self .tag (full_end_dest , value )
269
+ # Skip if value is unchanged
270
+ elif value == self .tag (full_end_dest ):
271
+ continue
272
+
273
+ try :
274
+ self .end_clients [end_dest ] = alicantoClient (end_dest )
275
+ if self .logic [full_end_dest ] is not None :
276
+ self .end_clients [end_dest ].write_analog_point (end_dest_tag , self .tag (full_end_dest ))
277
+ else :
278
+ self .end_clients [end_dest ].write_analog_point (end_dest_tag , self .tag (full_end_name ))
279
+ time .sleep (0.5 )
280
+ reply = self .end_clients [end_dest ].send ("READ=" + end_dest_tag )
281
+ value = reply [1 ].rstrip ('\x00 ' )
282
+ self .tag (full_end_dest , value )
283
+ except :
284
+ logger .error (f"\t Error Initializing Client: { self .end_clients } " )
285
+ continue
187
286
elif self .types [full_end_name ] == 'bool' :
188
287
if str (self .tag (full_end_name )).lower () != str (self .tag (full_end_dest )).lower ():
189
- self .end_clients [end_dest ].write_digital_point (end_dest_tag , self .tag (full_end_name ))
190
- reply = self .end_clients [end_dest ].send ("READ=" + end_name )
191
- value = reply [1 ].rstrip ('\x00 ' )
192
- self .tag (full_end_dest , value )
288
+ #Handle Logic
289
+ if self .logic [full_end_dest ] is not None :
290
+ expr = self .parser .parse (self .logic [full_end_dest ])
291
+ '''
292
+ # Assign variables
293
+ vars = {}
294
+ for var in expr.variables():
295
+ vars[var] = self.tag(var)
296
+ '''
297
+ i = 0
298
+ # Assign vars not working, so assign token manually
299
+ for token in expr .tokens :
300
+ for search_tag in self .tags :
301
+ if token .toString () == search_tag :
302
+ expr .tokens [i ].number_ = bool (self .tag (token .toString ()))
303
+ i += 1
304
+ # Evaluate expression
305
+ value = expr .evaluate (vars )
306
+ value = str (value )
307
+ if value != self .tag (full_end_dest ):
308
+ logger .debug (f"\t LOGIC: { full_end_dest .strip ()} ={ self .logic [full_end_dest ]} ----> { value } " )
309
+ # Assign new tag value
310
+ self .tag (full_end_dest , value )
311
+ # Skip if value is unchanged
312
+ elif value == self .tag (full_end_dest ):
313
+ continue
314
+ try :
315
+ self .end_clients [end_dest ] = alicantoClient (end_dest )
316
+ if self .logic [full_end_dest ] is not None :
317
+ self .end_clients [end_dest ].write_digital_point (end_dest_tag , eval (self .tag (full_end_dest )))
318
+ else :
319
+ self .end_clients [end_dest ].write_digital_point (end_dest_tag , eval (self .tag (full_end_name )))
320
+ time .sleep (0.5 )
321
+ reply = self .end_clients [end_dest ].send ("READ=" + end_dest_tag )
322
+ value = reply [1 ].rstrip ('\x00 ' )
323
+ self .tag (full_end_dest , value )
324
+ except :
325
+ logger .error (f"\t Error Initializing Client: { self .end_clients } " )
326
+ continue
193
327
194
328
def publish_state (self ):
195
- logger .debug ("=================== DATA ===================" )
329
+ logger .info ("=================== DATA ===================" )
196
330
for tag in self .tags :
197
- logger .debug (f"{ tag :<30} --- { self .tag (tag ):} " )
198
- logger .debug ("============================================" )
331
+ logger .info (f"{ tag :<30} --- { self .tag (tag ):} " )
332
+ logger .info ("============================================" )
199
333
200
334
def get_type (self , tag ):
201
335
return self .types [tag ]
@@ -221,6 +355,7 @@ def _subscription_handler(self, message):
221
355
message (str): published zmq message as a string
222
356
"""
223
357
points = message .split (',' )
358
+ points = points [:- 1 ] # remove last element since it might be empty
224
359
sub_source = threading .current_thread ().name
225
360
226
361
for point in points :
@@ -229,33 +364,41 @@ def _subscription_handler(self, message):
229
364
if point == "" :
230
365
continue
231
366
232
- tag = point .split (':' )[0 ]
233
- full_tag = sub_source + '/' + tag
234
- value = point .split (':' )[1 ]
367
+ try :
368
+ tag = point .split (':' )[0 ]
369
+ full_tag = sub_source + '/' + tag
370
+ value = point .split (':' )[1 ]
371
+ except :
372
+ continue
235
373
236
374
if full_tag not in self .tags :
237
375
continue
238
376
239
- if value .lower () == 'false' :
240
- value = False
241
- field = 'status'
242
- elif value .lower () == 'true' :
243
- value = True
244
- field = 'status'
377
+ if self .types [full_tag ] == 'bool' :
378
+ if value .lower () == 'false' or value == '0' :
379
+ value = False
380
+ field = 'status'
381
+ elif value .lower () == 'true' or value == '1' :
382
+ value = True
383
+ field = 'status'
245
384
else :
246
385
value = float (value )
247
386
field = 'value'
248
387
249
388
if field == 'value' :
250
389
if not math .isclose (float (self .tag (full_tag )), value ):
251
390
self .tag (full_tag , value )
252
- logger .info ("UPDATE NOW: " + full_tag )
253
- logger .info ("New value: " + str (value ))
391
+ logger .debug ("UPDATE NOW: " + full_tag )
392
+ logger .debug ("New value: " + str (value ))
254
393
else :
255
394
continue
256
395
elif field == 'status' :
257
- logger .info ("Cannot handle binary points" )
258
- continue
396
+ if self .tag (full_tag ) != value :
397
+ self .tag (full_tag , value )
398
+ logger .debug ("UPDATE NOW: " + full_tag )
399
+ logger .debug ("New value: " + str (value ))
400
+ else :
401
+ continue
259
402
else :
260
403
continue
261
404
0 commit comments