Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash in spans when time() is not monotonic #7960

Merged
merged 2 commits into from
Jul 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions distributed/spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,13 @@ def stop(self) -> float:
done
distributed.scheduler.TaskGroup.stop
"""
if not self.done:
return time()
out = max(tg.stop for tg in self.traverse_groups())
# absorb small errors in worker delay calculation
return max(self.enqueued, out)
if self.done:
out = max(tg.stop for tg in self.traverse_groups())
else:
out = time()
# absorb small errors in worker delay calculation, as well as in time() not
# being perfectly monotonic
return max(out, self.enqueued)

@property
def states(self) -> dict[TaskStateState, int]:
Expand Down Expand Up @@ -383,25 +385,26 @@ def _active_timeseries(self) -> Iterator[tuple[float, bool]]:
"""If this span is the output of :meth:`merge`, yield
(timestamp, True if at least one input span is active), forward-fill.
"""
now = time()
if self.id != "(merged)":
yield self.enqueued, True
yield self.stop if self.done else now, False
yield self.stop, False
return

events = []
for child in self.children:
events.append((child.enqueued, 1))
events.append((child.stop if child.done else now, -1))
events.sort()
events += [(child.enqueued, 1), (child.stop, -1)]
# enqueued <= stop by construction.
# Occasionally, enqueued == stop, e.g. when the clock is adjusted backwards.
# Prevent negative n_active when this happens.
events.sort(key=lambda el: el[0])

n_active = 0
for t, delta in events:
if not n_active:
assert delta > 0
assert delta == 1
yield t, True
n_active += delta
if n_active == 0:
if not n_active:
yield t, False

@property
Expand Down