@@ -296,6 +296,16 @@ def get_op_logs(self, path, offset):
296
296
logs [filename ] = data
297
297
return logs , new_offset
298
298
299
+ def get_lats_logs (self , path , offset ):
300
+ logs = {}
301
+ new_offset = 0
302
+ for filename in os .listdir (path ):
303
+ if ".txt" not in filename or 'lats' not in filename :
304
+ continue
305
+ data , new_offset = self .read_logs (os .path .join (path , filename ), offset )
306
+ logs [filename ] = data
307
+ return logs , new_offset
308
+
299
309
def get_sys_logs (self , path , offset ):
300
310
logs = {}
301
311
new_offset = 0
@@ -360,17 +370,19 @@ def logs_fn(self):
360
370
url = f"http://{ self .fetch_ip } :{ self .fetch_port } /executor-log"
361
371
executor_log_path = os .path .join (self .log_dir , self .executor_name , self .executor_uuid )
362
372
363
- wlog_offset , oplog_offset , sys_log_offset = 0 , 0 , 0
373
+ wlog_offset , oplog_offset , latslog_offset , sys_log_offset = 0 , 0 , 0 , 0
364
374
while not self .global_queue_thread .stopped ():
365
375
wlogs , wlog_offset = self .get_worker_logs (executor_log_path , wlog_offset )
366
376
oplogs , oplog_offset = self .get_op_logs (executor_log_path , oplog_offset )
377
+ latslogs , latslog_offset = self .get_lats_logs (executor_log_path , latslog_offset )
367
378
syslogs , sys_log_offset = self .get_sys_logs (executor_log_path , sys_log_offset )
368
379
369
380
payload = {
370
381
'executor_name' : self .executor_name ,
371
382
'executor_uuid' : self .executor_uuid ,
372
383
'worker_logs' : json .dumps (wlogs ),
373
384
'output_processor_logs' : json .dumps (oplogs ),
385
+ 'lats_logs' : json .dumps (latslogs ),
374
386
'syslogs' : json .dumps (syslogs ),
375
387
}
376
388
@@ -388,12 +400,15 @@ def logs_fn(self):
388
400
# send final logs before stopping
389
401
wlogs , wlog_offset = self .get_worker_logs (executor_log_path , wlog_offset )
390
402
oplogs , oplog_offset = self .get_op_logs (executor_log_path , oplog_offset )
403
+ latslogs , latslog_offset = self .get_lats_logs (executor_log_path , latslog_offset )
391
404
syslogs , sys_log_offset = self .get_sys_logs (executor_log_path , sys_log_offset )
405
+
392
406
payload = {
393
407
'executor_name' : self .executor_name ,
394
408
'executor_uuid' : self .executor_uuid ,
395
409
'worker_logs' : json .dumps (wlogs ),
396
410
'output_processor_logs' : json .dumps (oplogs ),
411
+ 'lats_logs' : json .dumps (latslogs ),
397
412
'syslogs' : json .dumps (syslogs ),
398
413
}
399
414
0 commit comments