Skip to content

Commit

Permalink
better status for canboat plugin, cleanup canboat plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
andreas committed Nov 12, 2024
1 parent 298ec7c commit 4c6e74e
Showing 1 changed file with 68 additions and 37 deletions.
105 changes: 68 additions & 37 deletions server/plugins/canboat/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,35 @@
from avnav_worker import WorkerParameter
from avnav_nmea import Key,NMEAParser

class Status:
def __init__(self,base):
self.lastTime=None
self.lastPos=None
self.base=base
def setPos(self):
self.lastPos=time.monotonic()
def setTime(self):
self.lastTime=time.monotonic()
def _valid(self,ts):
if ts is None:
return False
now=time.monotonic()
if (ts + 20) >= now:
return True
return False
def toState(self,api):
hasPos=self._valid(self.lastPos)
hasTime=self._valid(self.lastTime)
if not hasPos and not hasTime:
api.setStatus("RUNNING",self.base)
else:
txt=self.base
if hasTime:
txt+=", validTime"
if hasPos:
txt+=", validPosition"
api.setStatus("NMEA",txt)


class Plugin(object):
NM = 1852.0
Expand All @@ -55,7 +84,7 @@ class Plugin(object):
default=0,type= WorkerParameter.T_NUMBER)
P_SRC=WorkerParameter('sourceName',description='source name to be set for the generated records (defaults to plugin name)',
default='')
P_IV=WorkerParameter('timeInterval',description='time in seconds to store time received via n2k',
P_IV=WorkerParameter('timeInterval',description='time in seconds to store time received via n2k (also used as interval for auto send RMC, 0 to disable)',
default= 0.5,type=WorkerParameter.T_FLOAT)
P_TIPGNS=WorkerParameter('timePGNs',description='PGNs used to set time',
default=DEFAULT_PGNS)
Expand Down Expand Up @@ -83,9 +112,6 @@ class Plugin(object):
]
CONFIGLIST=list(map(lambda v:v.__dict__,CONFIG))
@classmethod
def key2path(cls,key):
return 'gps.'+key.key
@classmethod
def pluginInfo(cls):
"""
the description for the module
Expand All @@ -100,7 +126,7 @@ def pluginInfo(cls):
'description': 'a plugin that reads some PGNS from canboat. Currently supported: 126992:SystemTime. You need to set allowKeyOverwrite=true',
'version': '1.0',
'config': cls.CONFIGLIST,
'data': list(map(lambda k: {'path':cls.key2path(k),'description':k.description },cls.PATHES))
'data': list(map(lambda k: {'path':k.getKey(),'description':k.description },cls.PATHES))
}

def __init__(self,api):
Expand All @@ -117,13 +143,17 @@ def __init__(self,api):
self.changeSequence=0
self.socket=None
self.lastRmc=time.monotonic()
self.status=None

def rmcWatcher(self,sequence):
def rmcWatcher(self,sequence, source):
while sequence == self.changeSequence:
seq=0
data=self.api.fetchFromQueue(seq,10,includeSource=True,filter='$RMC')
seq,data=self.api.fetchFromQueue(seq,10,includeSource=True,filter='$RMC')
if len(data) > 0:
self.lastRmc=time.monotonic()
for d in data:
if d.source != source:
self.lastRmc=time.monotonic()
break


def changeConfig(self,newValues):
Expand All @@ -147,11 +177,16 @@ def run(self):
def _getConfig(self,cfg:WorkerParameter):
v=self.api.getConfigValue(cfg.name,cfg.default)
return cfg.checkValue(v,False)
def _getField(self,msg,name):
def _getField(self,msg,name,sub=None):
fields=msg.get("fields")
if fields is None:
return
return fields.get(name)
rt=fields.get(name)
if sub is None or rt is None:
return rt
if isinstance(rt,dict):
return rt.get(sub)
return rt
def _runInternal(self):
sequence=self.changeSequence
"""
Expand All @@ -166,8 +201,6 @@ def _runInternal(self):
sock=None
host=self._getConfig(self.P_HOST)
timeInterval=0.5
rmcWatcher=threading.Thread(target=self.rmcWatcher,args=[sequence],daemon=True)
rmcWatcher.start()
try:
port=self._getConfig(self.P_PORT)
timeInterval=self._getConfig(self.P_IV)
Expand All @@ -184,14 +217,16 @@ def _runInternal(self):
handledPGNs=[int(p) for p in handledPGNs]
self.api.log("started with host=%s,port %d, autoSendRMC=%d"%(host,port,autoSendRMC))
source=self._getConfig(self.P_SRC)
rmcWatcher=threading.Thread(target=self.rmcWatcher,args=[sequence,source],daemon=True)
rmcWatcher.start()
priority=self._getConfig(self.P_PRIORITY)
errorReported=False
self.api.setStatus("STARTED", "connecting to n2kd at %s:%d"%(host,port))
while sequence == self.changeSequence:
try:
self.socket = socket.create_connection((host, port),timeout=1000)
self.api.setStatus("RUNNING", "connected to n2kd at %s:%d" %(host,port))
hasNmea=False
self.status=Status("connected to n2kd at %s:%d" %(host,port))
self.status.toState(self.api)
buffer=""
lastTimeSet=time.monotonic()
while True:
Expand All @@ -216,23 +251,18 @@ def _runInternal(self):
if pgn in handledPGNs:
#currently we can decode messages that have a Date and Time field set
now = time.monotonic()
if now < lastTimeSet or now > (lastTimeSet+timeInterval):
cdate=self._getField(msg,'Date')
ctime=self._getField(msg,'Time')
if now >= (lastTimeSet + timeInterval) and timeInterval > 0:
lastTimeSet=now
cdate=self._getField(msg,'Date','value')
ctime=self._getField(msg,'Time','value')
dt=None
if cdate is not None and ctime is not None:
tsplit = ctime.split(".")
dt = datetime.datetime.strptime(cdate + " " + tsplit[0], "%Y.%m.%d %H:%M:%S")
if len(tsplit) > 1:
dt += datetime.timedelta(seconds=float("0." + tsplit[1]))
dt=datetime.datetime(year=1970,month=1,day=1)
dt+=datetime.timedelta(days=cdate,milliseconds=ctime/10)
if dt is not None:
if not hasNmea:
self.api.log("received time %s"%dt.isoformat())
self.api.setStatus("NMEA", "valid time")
hasNmea=True
self.api.addData(self.key2path(NMEAParser.K_TIME), self.formatTime(dt),source=source, sourcePriority=priority)
lastTimeSet=now
if autoSendRMC > 0:
self.status.setTime()
self.api.addData(NMEAParser.K_TIME.getKey(), self.formatTime(dt),source=source, sourcePriority=priority)
if autoSendRMC > 0:
if self.lastRmc is None or self.lastRmc < (now - autoSendRMC):
lat=self.api.getSingleValue("gps.lat")
lon=self.api.getSingleValue("gps.lon")
Expand All @@ -253,29 +283,30 @@ def _runInternal(self):

if readPos:
if pgn == 129025: #pos
clat=self.api.getSingleValue(self.key2path(NMEAParser.K_LAT),includeInfo=True)
clon=self.api.getSingleValue(self.key2path(NMEAParser.K_LON),includeInfo=True)
clat=self.api.getSingleValue(NMEAParser.K_LAT.getKey(),includeInfo=True)
clon=self.api.getSingleValue(NMEAParser.K_LON.getKey(),includeInfo=True)
if clon is None or clon.source == source or clat is None or clat.source == source:
lon=self._getField(msg,'Longitude')
lat=self._getField(msg,'Latitude')
if lon is not None and lat is not None:
self.api.addData(self.key2path(NMEAParser.K_LON),lon,source=source,sourcePriority=priority)
self.api.addData(self.key2path(NMEAParser.K_LAT),lat,source=source,sourcePriority=priority)
self.status.setPos()
self.api.addData(NMEAParser.K_LON.getKey(),lon,source=source,sourcePriority=priority)
self.api.addData(NMEAParser.K_LAT.getKey(),lat,source=source,sourcePriority=priority)
if pgn == 129026: #sog/cog
csog=self.api.getSingleValue(self.key2path(NMEAParser.K_SOG),includeInfo=True)
ccog=self.api.getSingleValue(self.key2path(NMEAParser.K_COG),includeInfo=True)
csog=self.api.getSingleValue(NMEAParser.K_SOG.getKey(),includeInfo=True)
ccog=self.api.getSingleValue(NMEAParser.K_COG.getKey(),includeInfo=True)
if csog is None or csog.source == source or ccog is None or ccog.source == source:
ref=self._getField(msg,"COG Reference")
if ref is not None and ref.get('value') == 0:
cog=self._getField(msg,'COG')
sog=self._getField(msg,'SOG')
if sog is not None and cog is not None:
self.api.addData(self.key2path(NMEAParser.K_SOG),sog,source=source,sourcePriority=priority)
self.api.addData(self.key2path(NMEAParser.K_COG),cog,source=source,sourcePriority=priority)
self.api.addData(NMEAParser.K_SOG.getKey(),sog,source=source,sourcePriority=priority)
self.api.addData(NMEAParser.K_COG.getKey(),cog,source=source,sourcePriority=priority)
#add other decoders here
except:
self.api.log("unable to decode json %s:%s"%(l,traceback.format_exc()))
pass
self.status.toState(self.api)
if len(buffer) > 4096:
raise Exception("no line feed in long data, stopping")
except Exception as e:
Expand Down

0 comments on commit 4c6e74e

Please sign in to comment.