Skip to content

Commit

Permalink
Re-run erred task on ComputeTaskEvent (#7967)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Jul 7, 2023
1 parent 8e6c287 commit 631d92a
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
14 changes: 14 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,20 @@ def test_gather_dep_failure(ws):
ws.validate = False


def test_recompute_erred_task(ws):
instructions = ws.handle_stimulus(
ComputeTaskEvent.dummy("x", run_id=1, stimulus_id="s1"),
ExecuteFailureEvent.dummy("x", run_id=1, stimulus_id="s2"),
ComputeTaskEvent.dummy("x", run_id=2, stimulus_id="s3"),
)
assert instructions == [
Execute(key="x", stimulus_id="s1"),
TaskErredMsg.match(key="x", run_id=1, stimulus_id="s2"),
Execute(key="x", stimulus_id="s3"),
]
assert ws.tasks["x"].state == "executing"


def test_transfer_incoming_metrics(ws):
assert ws.transfer_incoming_bytes == 0
assert ws.transfer_incoming_count == 0
Expand Down
5 changes: 1 addition & 4 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2868,17 +2868,14 @@ def _handle_compute_task(self, ev: ComputeTaskEvent) -> RecsInstrs:
ts, run_id=ev.run_id, stimulus_id=ev.stimulus_id
)
)
elif ts.state == "error":
instructions.append(
TaskErredMsg.from_task(ts, run_id=ev.run_id, stimulus_id=ev.stimulus_id)
)
elif ts.state in {
"released",
"fetch",
"flight",
"missing",
"cancelled",
"resumed",
"error",
}:
recommendations[ts] = "waiting"

Expand Down

0 comments on commit 631d92a

Please sign in to comment.