@@ -267,19 +267,23 @@ def start_pipeline(self, args = None, multi = False):
267
267
# Wrapped in try blocks so that the code will not fail if the pipeline or pypiper are not git repositories
268
268
gitvars = {}
269
269
try :
270
- gitvars ['pypiper_dir' ] = os .path .dirname (os .path .realpath (__file__ ))
271
- gitvars ['pypiper_hash' ] = subprocess .check_output ("cd " + os .path .dirname (os .path .realpath (__file__ )) + "; git rev-parse --verify HEAD 2>/dev/null" , shell = True )
272
- gitvars ['pypiper_date' ] = subprocess .check_output ("cd " + os .path .dirname (os .path .realpath (__file__ )) + "; git show -s --format=%ai HEAD 2>/dev/null" , shell = True )
273
- gitvars ['pypiper_diff' ] = subprocess .check_output ("cd " + os .path .dirname (os .path .realpath (__file__ )) + "; git diff --shortstat HEAD 2>/dev/null" , shell = True )
274
- gitvars ['pypiper_branch' ] = subprocess .check_output ("cd " + os .path .dirname (os .path .realpath (__file__ )) + "; git branch | grep '*' 2>/dev/null" , shell = True )
270
+ # pypiper dir
271
+ ppd = os .path .dirname (os .path .realpath (__file__ ))
272
+ gitvars ['pypiper_dir' ] = ppd
273
+ gitvars ['pypiper_hash' ] = subprocess .check_output ("cd " + ppd + "; git rev-parse --verify HEAD 2>/dev/null" , shell = True )
274
+ gitvars ['pypiper_date' ] = subprocess .check_output ("cd " + ppd + "; git show -s --format=%ai HEAD 2>/dev/null" , shell = True )
275
+ gitvars ['pypiper_diff' ] = subprocess .check_output ("cd " + ppd + "; git diff --shortstat HEAD 2>/dev/null" , shell = True )
276
+ gitvars ['pypiper_branch' ] = subprocess .check_output ("cd " + ppd + "; git branch | grep '*' 2>/dev/null" , shell = True )
275
277
except Exception :
276
278
pass
277
279
try :
278
- gitvars ['pipe_dir' ] = os .path .dirname (os .path .realpath (sys .argv [0 ]))
279
- gitvars ['pipe_hash' ] = subprocess .check_output ("cd " + os .path .dirname (os .path .realpath (sys .argv [0 ])) + "; git rev-parse --verify HEAD 2>/dev/null" , shell = True )
280
- gitvars ['pipe_date' ] = subprocess .check_output ("cd " + os .path .dirname (os .path .realpath (sys .argv [0 ])) + "; git show -s --format=%ai HEAD 2>/dev/null" , shell = True )
281
- gitvars ['pipe_diff' ] = subprocess .check_output ("cd " + os .path .dirname (os .path .realpath (sys .argv [0 ])) + "; git diff --shortstat HEAD 2>/dev/null" , shell = True )
282
- gitvars ['pipe_branch' ] = subprocess .check_output ("cd " + os .path .dirname (os .path .realpath (sys .argv [0 ])) + "; git branch | grep '*' 2>/dev/null" , shell = True )
280
+ # pipeline dir
281
+ pld = os .path .dirname (os .path .realpath (sys .argv [0 ]))
282
+ gitvars ['pipe_dir' ] = pld
283
+ gitvars ['pipe_hash' ] = subprocess .check_output ("cd " + pld + "; git rev-parse --verify HEAD 2>/dev/null" , shell = True )
284
+ gitvars ['pipe_date' ] = subprocess .check_output ("cd " + pld + "; git show -s --format=%ai HEAD 2>/dev/null" , shell = True )
285
+ gitvars ['pipe_diff' ] = subprocess .check_output ("cd " + pld + "; git diff --shortstat HEAD 2>/dev/null" , shell = True )
286
+ gitvars ['pipe_branch' ] = subprocess .check_output ("cd " + pld + "; git branch | grep '*' 2>/dev/null" , shell = True )
283
287
except Exception :
284
288
pass
285
289
@@ -305,7 +309,7 @@ def start_pipeline(self, args = None, multi = False):
305
309
if (gitvars ['pypiper_diff' ] != "" ):
306
310
print ("* " + "Pypiper diff" .rjust (20 ) + ": " + gitvars ['pypiper_diff' ].strip ())
307
311
except KeyError :
308
- # If any of the keys aren't set, that's OK. It just means pypiper isn't being run from a git repo.
312
+ # It is ok if keys aren't set, it means pypiper isn't in a git repo.
309
313
pass
310
314
311
315
try :
@@ -373,7 +377,8 @@ def run(self, cmd, target=None, lock_name=None, shell="guess", nofail=False, cle
373
377
:type target: str or None
374
378
:param lock_name: Name of lock file. Optional.
375
379
:type lock_name: str or None
376
- :param shell: If command requires should be run in its own shell. Optional. Default: "guess" -- run will try to determine if the command requires a shell.
380
+ :param shell: If command requires should be run in its own shell. Optional. Default: "guess" --
381
+ run will try to determine if the command requires a shell.
377
382
:type shell: bool
378
383
:param nofail: Should the pipeline proceed past a nonzero return from a process? Default: False
379
384
Nofail can be used to implement non-essential parts of the pipeline; if these processes fail,
@@ -405,6 +410,9 @@ def run(self, cmd, target=None, lock_name=None, shell="guess", nofail=False, cle
405
410
# Prepend "lock." to make it easy to find the lock files.
406
411
self .proc_lock_name = lock_name
407
412
lock_name = "lock." + lock_name
413
+ recover_name = "lock.recover." + self .proc_lock_name
414
+ recover_file = os .path .join (self .pipeline_outfolder , recover_name )
415
+ recover_mode = False
408
416
lock_file = os .path .join (self .pipeline_outfolder , lock_name )
409
417
process_return_code = 0
410
418
local_maxmem = 0
@@ -436,22 +444,28 @@ def run(self, cmd, target=None, lock_name=None, shell="guess", nofail=False, cle
436
444
if os .path .isfile (lock_file ):
437
445
if self .overwrite_locks :
438
446
print ("Found lock file; overwriting this target..." )
447
+ elif os .path .isfile (recover_file ):
448
+ print ("Found lock file; dynamic recovery set. Overwriting this target..." )
449
+ # remove the lock file which will then be prompty re-created for the current run.
450
+ recover_mode = True
451
+ # the recovery flag is now spent, so remove so we don't accidently re-recover a failed job
452
+ os .remove (recover_file )
439
453
else : # don't overwite locks
440
454
self ._wait_for_lock (lock_file )
441
455
# when it's done loop through again to try one more time (to see if the target exists now)
442
456
continue
443
457
444
458
# If you get to this point, the target doesn't exist, and the lock_file doesn't exist
445
459
# (or we should overwrite). create the lock (if you can)
446
- if not self .overwrite_locks :
460
+ if self .overwrite_locks or recover_mode :
461
+ self ._create_file (lock_file )
462
+ else :
447
463
try :
448
464
self ._create_file_racefree (lock_file ) # Create lock
449
465
except OSError as e :
450
466
if e .errno == errno .EEXIST : # File already exists
451
467
print ("Lock file created after test! Looping again." )
452
468
continue # Go back to start
453
- else :
454
- self ._create_file (lock_file )
455
469
456
470
##### End tests block
457
471
# If you make it past these tests, we should proceed to run the process.
@@ -486,7 +500,8 @@ def run(self, cmd, target=None, lock_name=None, shell="guess", nofail=False, cle
486
500
break
487
501
488
502
# Bad idea: don't return follow_result; it seems nice but nothing else
489
- # in your pipeline can depend on this since it won't be run if that command # isn't required because target exists.
503
+ # in your pipeline can depend on this since it won't be run if that command
504
+ # isn't required because target exists.
490
505
return process_return_code
491
506
492
507
@@ -777,8 +792,7 @@ def _report_profile(self, command, lock_name, elapsed_time, memory):
777
792
str (lock_name ) + "\t " + \
778
793
str (datetime .timedelta (seconds = round (elapsed_time , 2 ))) + "\t " + \
779
794
str (memory )
780
- # messageMarkdown = "> `" + command + "`\t" + str(elapsed_time).strip() + "\t " + str(memory).strip() + "\t" + "_PROF_"
781
- # print(messageMarkdown)
795
+
782
796
with open (self .pipeline_profile_file , "a" ) as myfile :
783
797
myfile .write (messageRaw + "\n " )
784
798
@@ -799,7 +813,7 @@ def report_result(self, key, value, annotation=None):
799
813
800
814
# keep the value in memory:
801
815
self .stats_dict [key ] = str (value ).strip ()
802
- messageRaw = key + "\t " + str (value ).strip () + "\t " + str (annotation )
816
+ messageRaw = key + "\t " + str (value ).strip () + "\t " + str (annotation )
803
817
messageMarkdown = "> `" + key + "`\t " + str (value ).strip ()\
804
818
+ "\t " + str (annotation ) + "\t " + "_RES_"
805
819
print (messageMarkdown )
@@ -949,15 +963,15 @@ def stop_pipeline(self):
949
963
self .set_status_flag ("completed" )
950
964
self ._cleanup ()
951
965
self .report_result ("Time" , str (datetime .timedelta (seconds = self .time_elapsed (self .starttime ))))
952
- self .report_result ("Success" , time .strftime ("%m-%d %H:%M:%S" ))
966
+ self .report_result ("Success" , time .strftime ("%m-%d- %H:%M:%S" ))
953
967
print ("\n ##### [Epilogue:]" )
954
968
print ("* " + "Total elapsed time" .rjust (20 ) + ": " + str (datetime .timedelta (seconds = self .time_elapsed (self .starttime ))))
955
969
# print("Peak memory used: " + str(memory_usage()["peak"]) + "kb")
956
970
print ("* " + "Peak memory used" .rjust (20 ) + ": " + str (round (self .peak_memory , 2 )) + " GB" )
957
971
self .timestamp ("* Pipeline completed at: " .rjust (20 ))
958
972
959
973
960
- def fail_pipeline (self , e ):
974
+ def fail_pipeline (self , e , dynamic_recover = False ):
961
975
"""
962
976
If the pipeline does not complete, this function will stop the pipeline gracefully.
963
977
It sets the status flag to failed and skips the normal success completion procedure.
@@ -980,6 +994,17 @@ def fail_pipeline(self, e):
980
994
self .set_status_flag ("failed" )
981
995
self .timestamp ("### Pipeline failed at: " )
982
996
print ("Total time: " , str (datetime .timedelta (seconds = self .time_elapsed (self .starttime ))))
997
+
998
+ if dynamic_recover :
999
+ # job was terminated, not failed due to a bad process.
1000
+ # flag this run as recoverable.
1001
+ if self .proc_lock_name :
1002
+ # if there is no process locked, then recovery will be automatic.
1003
+ recover_name = "lock.recover." + self .proc_lock_name
1004
+ recover_file = os .path .join (self .pipeline_outfolder , recover_name )
1005
+ print ("Setting dynamic recover file: " + recover_file )
1006
+ self ._create_file_racefree (recover_file )
1007
+
983
1008
raise e
984
1009
985
1010
@@ -995,7 +1020,7 @@ def _signal_term_handler(self, signal, frame):
995
1020
message = "Got SIGTERM; Failing gracefully..."
996
1021
with open (self .pipeline_log_file , "a" ) as myfile :
997
1022
myfile .write (message + "\n " )
998
- self .fail_pipeline (Exception ("SIGTERM" ))
1023
+ self .fail_pipeline (Exception ("SIGTERM" ), dynamic_recover = True )
999
1024
sys .exit (1 )
1000
1025
1001
1026
@@ -1006,7 +1031,7 @@ def _signal_int_handler(self, signal, frame):
1006
1031
message = "Got SIGINT (Ctrl +C); Failing gracefully..."
1007
1032
with open (self .pipeline_log_file , "a" ) as myfile :
1008
1033
myfile .write (message + "\n " )
1009
- self .fail_pipeline (Exception ("SIGINT" ))
1034
+ self .fail_pipeline (Exception ("SIGINT" ), dynamic_recover = True )
1010
1035
sys .exit (1 )
1011
1036
1012
1037
@@ -1382,7 +1407,7 @@ def add_pypiper_args(parser, groups = ["pypiper"], args = [None], all_args = Fal
1382
1407
if arg == "genome" :
1383
1408
parser .add_argument (
1384
1409
"-G" , "--genome" , dest = "genome_assembly" , type = str ,
1385
- help = "identifier for genome assempbly (required)" ,
1410
+ help = "identifier for genome assembly (required)" ,
1386
1411
required = False )
1387
1412
if arg == "single-or-paired" :
1388
1413
parser .add_argument (
0 commit comments