Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster wait #6700

Merged
merged 44 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
4f850e8
Moving wait_for_worker logic to cluster, and having client call that …
idorrington92 Jul 9, 2022
ec66806
Merge branch 'main' of https://github.com/dask/distributed into clust…
idorrington92 Jul 9, 2022
4ee2739
Adding test for cluster.wait_for_workers
idorrington92 Jul 9, 2022
f9a840b
use try and except to catch case where cluster is none or wait_for_wo…
idorrington92 Jul 20, 2022
5439674
Merge branch 'main' into cluster_wait
idorrington92 Jul 20, 2022
ca778c7
linting
idorrington92 Jul 20, 2022
8f532e9
Merge remote-tracking branch 'upstream/main' into cluster_wait
idorrington92 Jul 23, 2022
78985cc
This test has been removed on main branch, but for some reason git me…
idorrington92 Jul 23, 2022
aad5d56
Cluster has to use scheduler_info attribute instead of scheduler.iden…
idorrington92 Aug 6, 2022
b1ad4d5
Merge remote-tracking branch 'upstream/main' into cluster_wait
idorrington92 Aug 6, 2022
ef96549
lint
idorrington92 Aug 6, 2022
e682058
reverting
idorrington92 Aug 6, 2022
cdf2c0b
Merge remote-tracking branch 'upstream/main' into cluster_wait
idorrington92 Aug 8, 2022
45cf486
need to use cluster.scale when client.wait_for_workers is called whil…
idorrington92 Aug 8, 2022
088e696
need to use scheduler_info. Also, using cluster.scale to emulate beha…
idorrington92 Aug 8, 2022
4861151
using scheduler_info and dont need to call scale anymore
idorrington92 Aug 8, 2022
1d9705c
lint
idorrington92 Aug 8, 2022
97f4aa4
Merge remote-tracking branch 'upstream/main' into cluster_wait
idorrington92 Aug 9, 2022
c3907b7
adding gen_test decorator
idorrington92 Aug 9, 2022
ac1dbc5
Don't think we need to scale at start of wait_for_workers
idorrington92 Aug 9, 2022
44ccdd3
self.scheduler_info does not update worker status from init to runnin…
idorrington92 Aug 14, 2022
b78772e
Use Status
idorrington92 Aug 14, 2022
05eb270
Merge remote-tracking branch 'upstream/main' into cluster_wait
idorrington92 Aug 14, 2022
03d9d18
Scale was fixing the nworkers test because it forced the worker statu…
idorrington92 Aug 14, 2022
dfa0ea0
Refactoring
idorrington92 Aug 15, 2022
3d8ddca
Fixing type information
idorrington92 Aug 15, 2022
625960d
Experimenting with creating new comm
idorrington92 Aug 20, 2022
c946540
Create separate comm in _start and use that to update scheduler_info
idorrington92 Aug 20, 2022
7af5fce
Close new comm
idorrington92 Aug 20, 2022
2dd7932
initialise scheduler_info_comm
idorrington92 Aug 20, 2022
872e62a
Merge remote-tracking branch 'upstream/main' into cluster_wait
idorrington92 Sep 7, 2022
05a54c7
Don't allow n_workers to be zero for cluster wait_for_workers
idorrington92 Sep 7, 2022
632cfa9
Adding return type
idorrington92 Sep 7, 2022
5789bfd
Merge branch 'main' of https://github.com/dask/distributed into clust…
idorrington92 Jan 19, 2023
000febc
Change try-catch to be an explicit if-else
idorrington92 Jan 19, 2023
3d573e5
Check explicitly for cluster is none, as I think it's clearer
idorrington92 Jan 19, 2023
2b1713a
linting
idorrington92 Jan 19, 2023
8877a58
use scheduler_comm instead of opening new comm
idorrington92 Jan 20, 2023
005dd69
remove update_scheduler_info method
idorrington92 Jan 20, 2023
db9dc71
Merge branch 'main' of https://github.com/dask/distributed into clust…
idorrington92 Apr 10, 2023
494d3fc
pre-commit changes
idorrington92 Apr 10, 2023
21277ce
Merge branch 'main' of https://github.com/dask/distributed into clust…
idorrington92 Apr 20, 2023
d277ea6
Reduce number of works to see if it fixes github tests
idorrington92 Apr 20, 2023
c95aafd
Changing test to make it work in python 3.8
idorrington92 Apr 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,11 @@ def wait_for_workers(
raise ValueError(
f"`n_workers` must be a positive integer. Instead got {n_workers}."
)
return self.sync(self._wait_for_workers, n_workers, timeout=timeout)

if self.cluster is None:
return self.sync(self._wait_for_workers, n_workers, timeout=timeout)

return self.cluster.wait_for_workers(n_workers, timeout)

def _heartbeat(self):
# Don't send heartbeat if scheduler comm or cluster are already closed
Expand Down
55 changes: 55 additions & 0 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from distributed.compatibility import PeriodicCallback
from distributed.core import Status
from distributed.deploy.adaptive import Adaptive
from distributed.metrics import time
from distributed.objects import SchedulerInfo
from distributed.utils import (
Log,
Expand All @@ -33,6 +34,9 @@
logger = logging.getLogger(__name__)


no_default = "__no_default__"


class Cluster(SyncMethodMixin):
"""Superclass for cluster objects

Expand Down Expand Up @@ -582,6 +586,57 @@ def __eq__(self, other):
def __hash__(self):
return id(self)

async def _wait_for_workers(self, n_workers=0, timeout=None):
self.scheduler_info = SchedulerInfo(await self.scheduler_comm.identity())
if timeout:
deadline = time() + parse_timedelta(timeout)
else:
deadline = None

def running_workers(info):
return len(
[
ws
for ws in info["workers"].values()
if ws["status"] == Status.running.name
]
)

while n_workers and running_workers(self.scheduler_info) < n_workers:
if deadline and time() > deadline:
raise TimeoutError(
"Only %d/%d workers arrived after %s"
% (running_workers(self.scheduler_info), n_workers, timeout)
)
await asyncio.sleep(0.1)

self.scheduler_info = SchedulerInfo(await self.scheduler_comm.identity())

def wait_for_workers(
self, n_workers: int | str = no_default, timeout: float | None = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing unions with a pipe like this is supported in Python >=3.10. Not sure if that's related to the issues we are seeing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed it's 3.8. I do get same error locally. Changing (or removing) type hints doesn't fix it.

It's a thread leaking issue, which sounds much lower level that anything I've done here, but I'll keep looking in to it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've managed to fix it. I noticed the other tests in test_cluster used "async with" to start the cluster, and didn't pass in the loop fixture, so I copied that. I'd be lying if I said I knew what went wrong and how this fixed it though...

) -> None:
"""Blocking call to wait for n workers before continuing

Parameters
----------
n_workers : int
The number of workers
timeout : number, optional
Time in seconds after which to raise a
``dask.distributed.TimeoutError``
"""
if n_workers is no_default:
warnings.warn(
"Please specify the `n_workers` argument when using `Client.wait_for_workers`. Not specifying `n_workers` will no longer be supported in future versions.",
FutureWarning,
)
n_workers = 0
elif not isinstance(n_workers, int) or n_workers < 1:
raise ValueError(
f"`n_workers` must be a positive integer. Instead got {n_workers}."
)
return self.sync(self._wait_for_workers, n_workers, timeout=timeout)


def _exponential_backoff(
attempt: int, multiplier: float, exponential_base: float, max_interval: float
Expand Down
16 changes: 16 additions & 0 deletions distributed/deploy/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
from tornado.ioloop import IOLoop

from distributed import LocalCluster, Status
from distributed.deploy.cluster import Cluster, _exponential_backoff
from distributed.utils_test import gen_test

Expand Down Expand Up @@ -38,6 +39,21 @@ async def test_logs_deprecated():
cluster.logs()


@gen_test()
async def test_cluster_wait_for_worker():
async with LocalCluster(n_workers=2, asynchronous=True) as cluster:
assert len(cluster.scheduler.workers) == 2
cluster.scale(4)
await cluster.wait_for_workers(4)
assert all(
[
worker["status"] == Status.running.name
for _, worker in cluster.scheduler_info["workers"].items()
]
)
assert len(cluster.scheduler.workers) == 4


@gen_test()
async def test_deprecated_loop_properties():
class ExampleCluster(Cluster):
Expand Down