@@ -65,8 +65,8 @@ def __init__(self,configfile=None,simulate=False,noupload=False):
65
65
if "serialsleep" in self .cfg :
66
66
self .serialsleep = 1e-3 * self .cfg ["serialsleep" ]
67
67
else :
68
- #default of 20 microseconds
69
- self .serialsleep = 20e -3
68
+ #default of 100 milliseconds
69
+ self .serialsleep = 100e -3
70
70
71
71
#possibly create the data directory if it doesn't exist yet
72
72
if not os .path .exists (self .cfg ['data_dir' ]):
@@ -75,7 +75,8 @@ def __init__(self,configfile=None,simulate=False,noupload=False):
75
75
self .openLogFile = os .path .join (self .cfg ['data_dir' ],self .cfg ['file_base' ]+ ".tmp" )
76
76
#default (will be updated from GNSS info)
77
77
self .logdate = date .today ()
78
-
78
+ self .triggerRotate = False
79
+ self .cancelUpload = False
79
80
self .openSerial ()
80
81
self .setupWebdav ()
81
82
@@ -109,24 +110,24 @@ async def rotateNMEAlog(self):
109
110
110
111
#Asynchronously wait for new serial data
111
112
nmeamsg = await self .getnmea ()
112
- if not nmeamsg .endswith (b"\n " ):
113
- #no info -> try again later (or in the case of simulate data rewind the buffer
114
- if self .simulate :
115
- self .serial .seek (0 )
116
- continue
117
- print ("no data found on the serial port, retrying in one second" )
113
+ if nmeamsg is None :
118
114
await asyncio .sleep (1 )
119
115
continue
116
+
120
117
if rmcregex .match (nmeamsg ):
121
118
currentdate = datetime .strptime (nmeamsg .split (b"," )[9 ].decode ('utf-8' ),"%d%m%y" ).date ()
122
119
if not prevdate :
123
120
prevdate = currentdate
124
121
self .logdate = currentdate
125
122
126
- if prevdate < currentdate :
123
+ if prevdate < currentdate or self .triggerRotate :
124
+ self .triggerRotate = False
127
125
break #will stop the loop at a date turnover
128
126
129
127
self .writeToLog (nmeamsg )
128
+
129
+ #reset rotate trigger
130
+ self .triggerRotate = False
130
131
131
132
self .closeLog ()
132
133
@@ -135,7 +136,7 @@ def writeToLog(self,msg):
135
136
if self .logfid :
136
137
self .logfid .write (msg )
137
138
138
-
139
+
139
140
140
141
def openLog (self ):
141
142
print (f"Opening log { self .openLogFile } " )
@@ -172,13 +173,27 @@ def closeLog(self):
172
173
173
174
async def getnmea (self ):
174
175
line = self .serial .readline ()
176
+
175
177
#sleeping xx microseconds allows other asynchronous work (e.g. file uploads) to be done while waiting for a new line on the serial input
176
178
#note: we expect around ~10 nmea messages (lines) per seconds so we can wait and do other stuff in between
177
179
await asyncio .sleep (self .serialsleep )
180
+
181
+ if not line .endswith (b"\n " ):
182
+ #no info -> try again later (or in the case of simulate data rewind the buffer
183
+ if self .simulate :
184
+ #simulate a new dataset
185
+ self .serial .seek (0 )
186
+ self .triggerRotate = True
187
+ print ("simulating new data after 10 seconds.." )
188
+ await asyncio .sleep (10 )
189
+ line = None
178
190
return line
179
191
180
192
181
- async def getwebdavListing (self ,regexlog ):
193
+ async def webdavUpload (self ):
194
+ """
195
+ Upload logs using webdav
196
+ """
182
197
xmlprop = """<propfind xmlns="DAV:">
183
198
<prop>
184
199
<getlastmodified xmlns="DAV:"/>
@@ -187,24 +202,47 @@ async def getwebdavListing(self,regexlog):
187
202
</prop>
188
203
</propfind>
189
204
"""
205
+ timeout = aiohttp .ClientTimeout (total = None , sock_connect = 20 , sock_read = 20 )
190
206
207
+ regexlog = re .compile (f"{ self .cfg ['file_base' ]} _.+[0-9]{{2}}.gz" )
208
+ self .cancelUpload = False
209
+
191
210
192
- async with aiohttp .ClientSession (auth = self .webdavauth ) as client :
193
- response = await client .request ('PROPFIND' ,self .webdav ,data = xmlprop )
194
- xmlbytes = await response .content .read ()
195
- xmlel = ET .fromstring (xmlbytes )
196
-
197
- return [os .path .basename (filename .text ) for filename in xmlel .findall ('.//{DAV:}href' ) if regexlog .search (filename .text )]
198
-
211
+ async with aiohttp .ClientSession (auth = self .webdavauth ,timeout = timeout ) as client :
212
+ while self .isLogging and not self .cancelUpload :
213
+ #try to get a webdav listing
214
+ try :
215
+ response = await client .request ('PROPFIND' ,self .webdav ,data = xmlprop )
216
+ xmlbytes = await response .content .read ()
217
+ print ("host is online and directory is listed, proceeding upload" )
218
+ break
219
+ except (asyncio .exceptions .TimeoutError , aiohttp .client_exceptions .ClientConnectorError ):
220
+ # host may be offline so wait and try again after x seconds
221
+ waitforsec = 60
222
+ print (f"host is offline, sleeping for { waitforsec } seconds before retrying" )
223
+ await asyncio .sleep (waitforsec )
224
+
225
+ if self .cancelUpload :
226
+ self .cancelUpload = False
227
+ return
228
+
229
+ xmlel = ET .fromstring (xmlbytes )
230
+ remotelogs = [os .path .basename (filename .text ) for filename in xmlel .findall ('.//{DAV:}href' ) if regexlog .search (filename .text )]
231
+
232
+ # also find most up to date local logs
233
+ locallogs = [filename for filename in os .listdir (self .cfg ['data_dir' ]) if regexlog .search (filename )]
234
+ #and check which ones need uploading
235
+ uploadlogs = [os .path .join (self .cfg ['data_dir' ],filename ) for filename in locallogs if filename not in remotelogs ]
236
+
237
+ for logf in uploadlogs :
238
+ uploadurl = self .webdav + "/" + os .path .basename (logf )
239
+ with open (logf ,'rb' ) as fid :
240
+ print (f"Uploading { os .path .basename (logf )} " )
241
+ try :
242
+ await client .put (uploadurl ,data = fid .read ())
243
+ except (asyncio .exceptions .TimeoutError , aiohttp .client_exceptions .ClientConnectorError ):
244
+ print (f"failed to upload { logf } " )
199
245
200
- async def uploadLogWebdav (self ,filename ):
201
- """Upload a file to a webdav folder"""
202
-
203
- uploadurl = self .webdav + "/" + os .path .basename (filename )
204
- async with aiohttp .ClientSession (auth = self .webdavauth ) as client :
205
- with open (filename ,'rb' ) as fid :
206
- print (f"Uploading { filename } " )
207
- await client .put (uploadurl ,data = fid .read ())
208
246
209
247
210
248
async def uploadLogs (self ):
@@ -213,29 +251,22 @@ async def uploadLogs(self):
213
251
#no upload locations specified or upload is disabled
214
252
print ("No upload location specified or disabled, cancelled upload" )
215
253
return
254
+ await self .webdavUpload ()
216
255
217
- regexlog = re .compile (f"{ self .cfg ['file_base' ]} _.+[0-9]{{2}}.gz" )
218
- remotelogs = await self .getwebdavListing (regexlog )
219
-
220
- #get a local list with files which potentially need to be uploaded
221
-
222
- locallogs = [filename for filename in os .listdir (self .cfg ['data_dir' ]) if regexlog .search (filename )]
223
-
224
-
225
- uploadlogs = [os .path .join (self .cfg ['data_dir' ],filename ) for filename in locallogs if filename not in remotelogs ]
226
- for logf in uploadlogs :
227
- await self .uploadLogWebdav (logf )
228
256
229
257
230
258
async def startLoggingDaemon (self ):
231
259
self .isLogging = True
232
260
while self .isLogging :
261
+ self .cancelUpload = False
233
262
synctask = asyncio .create_task (self .uploadLogs ())
234
263
await self .rotateNMEAlog ()
235
264
#wait for synctask to finish with timeout as a backup
236
265
# It should have been finished, and if not it should be cancelled
237
266
try :
238
- await asyncio .wait_for (synctask , timeout = 60 )
267
+ print ("wait for upload job to finish" )
268
+ self .cancelUpload = True
269
+ await asyncio .wait_for (synctask , timeout = 20 )
239
270
except asyncio .TimeoutError :
240
271
pass
241
272
0 commit comments