Skip to content
35 changes: 18 additions & 17 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2848,7 +2848,7 @@ def transition_processing_memory(
{
"op": "cancel-compute",
"key": key,
"reason": "Finished on different worker",
"stimulus_id": f"processing-memory-{time()}",
}
]

Expand Down Expand Up @@ -7629,12 +7629,10 @@ async def get_profile(

async def get_profile_metadata(
self,
comm=None,
workers=None,
merge_workers=True,
start=None,
stop=None,
profile_cycle_interval=None,
workers: "Iterable[str] | None" = None,
start: float = 0,
stop: "float | None" = None,
profile_cycle_interval: "str | float | None" = None,
):
parent: SchedulerState = cast(SchedulerState, self)
dt = profile_cycle_interval or dask.config.get(
Expand All @@ -7652,16 +7650,19 @@ async def get_profile_metadata(
)

results = [r for r in results if not isinstance(r, Exception)]
counts = [v["counts"] for v in results]
counts = itertools.groupby(merge_sorted(*counts), lambda t: t[0] // dt * dt)
counts = [(time, sum(pluck(1, group))) for time, group in counts]

keys = set()
for v in results:
for t, d in v["keys"]:
for k in d:
keys.add(k)
keys = {k: [] for k in keys}
counts = [
(time, sum(pluck(1, group)))
for time, group in itertools.groupby(
merge_sorted(
*(v["counts"] for v in results),
),
lambda t: t[0] // dt * dt,
)
]

keys: dict[str, list[list]] = {
k: [] for v in results for t, d in v["keys"] for k in d
}

groups1 = [v["keys"] for v in results]
groups2 = list(merge_sorted(*groups1, key=first))
Expand Down
8 changes: 4 additions & 4 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,10 +859,10 @@ async def test_missing_data_heals(c, s, a, b):
# Secretly delete y's key
if y.key in a.data:
del a.data[y.key]
a.release_key(y.key)
a.release_key(y.key, stimulus_id="test")
if y.key in b.data:
del b.data[y.key]
b.release_key(y.key)
b.release_key(y.key, stimulus_id="test")
await asyncio.sleep(0)

w = c.submit(add, y, z)
Expand All @@ -884,7 +884,7 @@ async def test_gather_robust_to_missing_data(c, s, a, b):
if f.key in w.data:
del w.data[f.key]
await asyncio.sleep(0)
w.release_key(f.key)
w.release_key(f.key, stimulus_id="test")

xx, yy, zz = await c.gather([x, y, z])
assert (xx, yy, zz) == (1, 2, 3)
Expand All @@ -907,7 +907,7 @@ async def test_gather_robust_to_nested_missing_data(c, s, a, b):
if datum.key in worker.data:
del worker.data[datum.key]
await asyncio.sleep(0)
worker.release_key(datum.key)
worker.release_key(datum.key, stimulus_id="test")

result = await c.gather([z])

Expand Down
Loading