Skip to content

Commit

Permalink
Fix crash in spans when time() is not monotonic (#7960)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Jul 5, 2023
1 parent a47cb0a commit 566fd1f
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions distributed/spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,13 @@ def stop(self) -> float:
done
distributed.scheduler.TaskGroup.stop
"""
if not self.done:
return time()
out = max(tg.stop for tg in self.traverse_groups())
# absorb small errors in worker delay calculation
return max(self.enqueued, out)
if self.done:
out = max(tg.stop for tg in self.traverse_groups())
else:
out = time()
# absorb small errors in worker delay calculation, as well as in time() not
# being perfectly monotonic
return max(out, self.enqueued)

@property
def states(self) -> dict[TaskStateState, int]:
Expand Down Expand Up @@ -383,25 +385,26 @@ def _active_timeseries(self) -> Iterator[tuple[float, bool]]:
"""If this span is the output of :meth:`merge`, yield
(timestamp, True if at least one input span is active), forward-fill.
"""
now = time()
if self.id != "(merged)":
yield self.enqueued, True
yield self.stop if self.done else now, False
yield self.stop, False
return

events = []
for child in self.children:
events.append((child.enqueued, 1))
events.append((child.stop if child.done else now, -1))
events.sort()
events += [(child.enqueued, 1), (child.stop, -1)]
# enqueued <= stop by construction.
# Occasionally, enqueued == stop, e.g. when the clock is adjusted backwards.
# Prevent negative n_active when this happens.
events.sort(key=lambda el: el[0])

n_active = 0
for t, delta in events:
if not n_active:
assert delta > 0
assert delta == 1
yield t, True
n_active += delta
if n_active == 0:
if not n_active:
yield t, False

@property
Expand Down

0 comments on commit 566fd1f

Please sign in to comment.