Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster wait #6700

Merged
merged 44 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
4f850e8
Moving wait_for_worker logic to cluster, and having client call that …
idorrington92 Jul 9, 2022
ec66806
Merge branch 'main' of https://github.com/dask/distributed into clust…
idorrington92 Jul 9, 2022
4ee2739
Adding test for cluster.wait_for_workers
idorrington92 Jul 9, 2022
f9a840b
use try and except to catch case where cluster is none or wait_for_wo…
idorrington92 Jul 20, 2022
5439674
Merge branch 'main' into cluster_wait
idorrington92 Jul 20, 2022
ca778c7
linting
idorrington92 Jul 20, 2022
8f532e9
Merge remote-tracking branch 'upstream/main' into cluster_wait
idorrington92 Jul 23, 2022
78985cc
This test has been removed on main branch, but for some reason git me…
idorrington92 Jul 23, 2022
aad5d56
Cluster has to use scheduler_info attribute instead of scheduler.iden…
idorrington92 Aug 6, 2022
b1ad4d5
Merge remote-tracking branch 'upstream/main' into cluster_wait
idorrington92 Aug 6, 2022
ef96549
lint
idorrington92 Aug 6, 2022
e682058
reverting
idorrington92 Aug 6, 2022
cdf2c0b
Merge remote-tracking branch 'upstream/main' into cluster_wait
idorrington92 Aug 8, 2022
45cf486
need to use cluster.scale when client.wait_for_workers is called whil…
idorrington92 Aug 8, 2022
088e696
need to use scheduler_info. Also, using cluster.scale to emulate beha…
idorrington92 Aug 8, 2022
4861151
using scheduler_info and dont need to call scale anymore
idorrington92 Aug 8, 2022
1d9705c
lint
idorrington92 Aug 8, 2022
97f4aa4
Merge remote-tracking branch 'upstream/main' into cluster_wait
idorrington92 Aug 9, 2022
c3907b7
adding gen_test decorator
idorrington92 Aug 9, 2022
ac1dbc5
Don't think we need to scale at start of wait_for_workers
idorrington92 Aug 9, 2022
44ccdd3
self.scheduler_info does not update worker status from init to runnin…
idorrington92 Aug 14, 2022
b78772e
Use Status
idorrington92 Aug 14, 2022
05eb270
Merge remote-tracking branch 'upstream/main' into cluster_wait
idorrington92 Aug 14, 2022
03d9d18
Scale was fixing the nworkers test because it forced the worker statu…
idorrington92 Aug 14, 2022
dfa0ea0
Refactoring
idorrington92 Aug 15, 2022
3d8ddca
Fixing type information
idorrington92 Aug 15, 2022
625960d
Experimenting with creating new comm
idorrington92 Aug 20, 2022
c946540
Create separate comm in _start and use that to update scheduler_info
idorrington92 Aug 20, 2022
7af5fce
Close new comm
idorrington92 Aug 20, 2022
2dd7932
initialise scheduler_info_comm
idorrington92 Aug 20, 2022
872e62a
Merge remote-tracking branch 'upstream/main' into cluster_wait
idorrington92 Sep 7, 2022
05a54c7
Don't allow n_workers to be zero for cluster wait_for_workers
idorrington92 Sep 7, 2022
632cfa9
Adding return type
idorrington92 Sep 7, 2022
5789bfd
Merge branch 'main' of https://github.com/dask/distributed into clust…
idorrington92 Jan 19, 2023
000febc
Change try-catch to be an explicit if-else
idorrington92 Jan 19, 2023
3d573e5
Check explicitly for cluster is none, as I think it's clearer
idorrington92 Jan 19, 2023
2b1713a
linting
idorrington92 Jan 19, 2023
8877a58
use scheduler_comm instead of opening new comm
idorrington92 Jan 20, 2023
005dd69
remove update_scheduler_info method
idorrington92 Jan 20, 2023
db9dc71
Merge branch 'main' of https://github.com/dask/distributed into clust…
idorrington92 Apr 10, 2023
494d3fc
pre-commit changes
idorrington92 Apr 10, 2023
21277ce
Merge branch 'main' of https://github.com/dask/distributed into clust…
idorrington92 Apr 20, 2023
d277ea6
Reduce number of works to see if it fixes github tests
idorrington92 Apr 20, 2023
c95aafd
Changing test to make it work in python 3.8
idorrington92 Apr 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,10 @@ def wait_for_workers(self, n_workers=0, timeout=None):
Time in seconds after which to raise a
``dask.distributed.TimeoutError``
"""
return self.sync(self._wait_for_workers, n_workers, timeout=timeout)
if self.cluster is not None:
return self.cluster.wait_for_workers(n_workers, timeout)
jacobtomlinson marked this conversation as resolved.
Show resolved Hide resolved
else:
return self.sync(self._wait_for_workers, n_workers, timeout=timeout)

def _heartbeat(self):
if self.scheduler_comm:
Expand Down
42 changes: 42 additions & 0 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from distributed.core import Status
from distributed.deploy.adaptive import Adaptive
from distributed.metrics import time
from distributed.objects import SchedulerInfo
from distributed.utils import (
Log,
Expand Down Expand Up @@ -531,3 +532,44 @@ def __eq__(self, other):

def __hash__(self):
return id(self)

async def _wait_for_workers(self, n_workers=0, timeout=None):
info = self.scheduler.identity()
self._scheduler_identity = SchedulerInfo(info)
if timeout:
deadline = time() + parse_timedelta(timeout)
else:
deadline = None

def running_workers(info):
return len(
[
ws
for ws in info["workers"].values()
if ws["status"] == Status.running.name
]
)

while n_workers and running_workers(info) < n_workers:
if deadline and time() > deadline:
raise TimeoutError(
"Only %d/%d workers arrived after %s"
% (running_workers(info), n_workers, timeout)
)
await asyncio.sleep(0.1)

info = self.scheduler.identity()
self._scheduler_identity = SchedulerInfo(info)

def wait_for_workers(self, n_workers=0, timeout=None):
"""Blocking call to wait for n workers before continuing

Parameters
----------
n_workers : int
The number of workers
timeout : number, optional
Time in seconds after which to raise a
``dask.distributed.TimeoutError``
"""
return self.sync(self._wait_for_workers, n_workers, timeout=timeout)
22 changes: 22 additions & 0 deletions distributed/deploy/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

from distributed import LocalCluster
from distributed.deploy.cluster import Cluster
from distributed.utils_test import gen_test

Expand Down Expand Up @@ -48,3 +49,24 @@ def __init__(self):
assert "foo" in cluster._cluster_info # exists before start() called
with cluster: # start and stop the cluster to avoid a resource warning
pass


@gen_test()
async def test_cluster_wait_for_worker(loop):
with LocalCluster(n_workers=3, loop=loop) as cluster:
assert all(
[
worker.status.name == "running"
for _, worker in cluster.scheduler.workers.items()
]
)
assert len(cluster.scheduler.workers) == 3
cluster.scale(10)
cluster.wait_for_workers(10)
assert all(
[
worker.status.name == "running"
for _, worker in cluster.scheduler.workers.items()
]
)
assert len(cluster.scheduler.workers) == 10