@@ -134,40 +134,37 @@ def handle_checkpoint_restart(self, state_update, task, wfi, db):
134
134
135
135
def handle_state_change (self , state_update , task , wfi , db ):
136
136
"""Handle a normal state change for a task."""
137
+ wf_state = wfi .get_workflow_state ()
137
138
if state_update .job_state == 'COMPLETED' :
138
139
for output in task .outputs :
139
140
if output .glob is not None :
140
141
wfi .set_task_output (task , output .id , output .glob )
141
142
else :
142
143
wfi .set_task_output (task , output .id , "temp" )
143
144
tasks = wfi .finalize_task (task )
144
- wf_state = wfi .get_workflow_state ()
145
145
if tasks and wf_state not in ('PAUSED' , 'Cancelled' ):
146
146
wf_utils .schedule_submit_tasks (state_update .wf_id , tasks )
147
147
148
- if wfi .workflow_completed ():
149
- wf_id = wfi .workflow_id
150
- log .info (f"Workflow { wf_id } Completed" )
151
- archive_workflow (db , state_update .wf_id )
152
- log .info ('Workflow Completed' )
153
- elif wf_state == 'Cancelled' and wfi .cancelled_workflow_completed ():
154
- wf_id = wfi .workflow_id
155
- log .info (f"Scheduled tasks for cancelled workflow { wf_id } completed" )
156
- archive_workflow (db , wf_id , final_state = wf_state )
157
- log .info ('Workflow Archived' )
158
-
159
- # If the job failed and it doesn't include a checkpoint-restart hint,
160
- # then fail the entire workflow
148
+ # If the job failed, fail the dependent tasks
161
149
if state_update .job_state in ['FAILED' , 'SUBMIT_FAIL' ]:
162
150
set_dependent_tasks_dep_fail (db , wfi , state_update .wf_id , task )
163
- log .info ("Workflow failed" )
164
- wf_id = wfi .workflow_id
165
- archive_fail_workflow (db , wf_id )
151
+ log .info (f"Task { task .name } failed" )
166
152
167
153
if state_update .job_state == 'BUILD_FAIL' :
168
154
log .error (f'Workflow failed due to failed container build for task { task .name } ' )
169
155
archive_fail_workflow (db , state_update .wf_id )
170
156
157
+ if wfi .workflow_completed ():
158
+ wf_id = wfi .workflow_id
159
+ log .info (f"Workflow { wf_id } Completed" )
160
+ archive_workflow (db , state_update .wf_id )
161
+ log .info ('Workflow Completed' )
162
+ elif wf_state == 'Cancelled' and wfi .cancelled_workflow_completed ():
163
+ wf_id = wfi .workflow_id
164
+ log .info (f"Scheduled tasks for cancelled workflow { wf_id } completed" )
165
+ archive_workflow (db , wf_id , final_state = wf_state )
166
+ log .info ('Workflow Archived' )
167
+
171
168
def update_task_state (self , state_update , db ):
172
169
"""Update the state of a single task from the task manager."""
173
170
wfi = wf_utils .get_workflow_interface (state_update .wf_id )
0 commit comments