-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cluster wait #6700
Cluster wait #6700
Changes from 37 commits
4f850e8
ec66806
4ee2739
f9a840b
5439674
ca778c7
8f532e9
78985cc
aad5d56
b1ad4d5
ef96549
e682058
cdf2c0b
45cf486
088e696
4861151
1d9705c
97f4aa4
c3907b7
ac1dbc5
44ccdd3
b78772e
05eb270
03d9d18
dfa0ea0
3d8ddca
625960d
c946540
7af5fce
2dd7932
872e62a
05a54c7
632cfa9
5789bfd
000febc
3d573e5
2b1713a
8877a58
005dd69
db9dc71
494d3fc
21277ce
d277ea6
c95aafd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
from distributed.compatibility import PeriodicCallback | ||
from distributed.core import Status | ||
from distributed.deploy.adaptive import Adaptive | ||
from distributed.metrics import time | ||
from distributed.objects import SchedulerInfo | ||
from distributed.utils import ( | ||
Log, | ||
|
@@ -33,6 +34,9 @@ | |
logger = logging.getLogger(__name__) | ||
|
||
|
||
no_default = "__no_default__" | ||
|
||
|
||
class Cluster(SyncMethodMixin): | ||
"""Superclass for cluster objects | ||
|
||
|
@@ -73,6 +77,7 @@ def __init__( | |
self.periodic_callbacks = {} | ||
self._watch_worker_status_comm = None | ||
self._watch_worker_status_task = None | ||
self._scheduler_info_comm = None | ||
self._cluster_manager_logs = [] | ||
self.quiet = quiet | ||
self.scheduler_comm = None | ||
|
@@ -120,15 +125,17 @@ def name(self, name): | |
self._cluster_info["name"] = name | ||
|
||
async def _start(self): | ||
comm = await self.scheduler_comm.live_comm() | ||
comm.name = "Cluster worker status" | ||
await comm.write({"op": "subscribe_worker_status"}) | ||
self.scheduler_info = SchedulerInfo(await comm.read()) | ||
self._watch_worker_status_comm = comm | ||
self._watch_worker_status_comm = await self.scheduler_comm.live_comm() | ||
self._watch_worker_status_comm.name = "Cluster worker status" | ||
await self._watch_worker_status_comm.write({"op": "subscribe_worker_status"}) | ||
self.scheduler_info = SchedulerInfo(await self._watch_worker_status_comm.read()) | ||
self._watch_worker_status_task = asyncio.ensure_future( | ||
self._watch_worker_status(comm) | ||
self._watch_worker_status(self._watch_worker_status_comm) | ||
) | ||
|
||
self._scheduler_info_comm = await self.scheduler_comm.live_comm() | ||
self._scheduler_info_comm.name = "Scheduler info" | ||
|
||
info = await self.scheduler_comm.get_metadata( | ||
keys=["cluster-manager-info"], default={} | ||
) | ||
|
@@ -187,6 +194,8 @@ async def _close(self): | |
with suppress(AttributeError): | ||
self._adaptive.stop() | ||
|
||
if self._scheduler_info_comm: | ||
await self._scheduler_info_comm.close() | ||
if self._watch_worker_status_comm: | ||
await self._watch_worker_status_comm.close() | ||
if self._watch_worker_status_task: | ||
|
@@ -580,3 +589,59 @@ def __eq__(self, other): | |
|
||
def __hash__(self): | ||
return id(self) | ||
|
||
async def _wait_for_workers(self, n_workers=0, timeout=None): | ||
await self.update_scheduler_info() | ||
if timeout: | ||
deadline = time() + parse_timedelta(timeout) | ||
else: | ||
deadline = None | ||
|
||
def running_workers(info): | ||
return len( | ||
[ | ||
ws | ||
for ws in info["workers"].values() | ||
if ws["status"] == Status.running.name | ||
] | ||
) | ||
|
||
while n_workers and running_workers(self.scheduler_info) < n_workers: | ||
if deadline and time() > deadline: | ||
raise TimeoutError( | ||
"Only %d/%d workers arrived after %s" | ||
% (running_workers(self.scheduler_info), n_workers, timeout) | ||
) | ||
await asyncio.sleep(0.1) | ||
|
||
await self.update_scheduler_info() | ||
|
||
async def update_scheduler_info(self) -> None: | ||
"""Send comm to scheduler requesting information and update scheduler_info accordingly""" | ||
await self._scheduler_info_comm.write({"op": "identity"}) | ||
self.scheduler_info = SchedulerInfo(await self._scheduler_info_comm.read()) | ||
|
||
def wait_for_workers( | ||
self, n_workers: int | str = no_default, timeout: float | None = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Writing unions with a pipe like this is supported in Python >=3.10. Not sure if that's related to the issues we are seeing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed it's 3.8. I do get same error locally. Changing (or removing) type hints doesn't fix it. It's a thread leaking issue, which sounds much lower level that anything I've done here, but I'll keep looking in to it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've managed to fix it. I noticed the other tests in test_cluster used "async with" to start the cluster, and didn't pass in the loop fixture, so I copied that. I'd be lying if I said I knew what went wrong and how this fixed it though... |
||
) -> None: | ||
"""Blocking call to wait for n workers before continuing | ||
|
||
Parameters | ||
---------- | ||
n_workers : int | ||
The number of workers | ||
timeout : number, optional | ||
Time in seconds after which to raise a | ||
``dask.distributed.TimeoutError`` | ||
""" | ||
if n_workers is no_default: | ||
warnings.warn( | ||
"Please specify the `n_workers` argument when using `Client.wait_for_workers`. Not specifying `n_workers` will no longer be supported in future versions.", | ||
FutureWarning, | ||
) | ||
n_workers = 0 | ||
elif not isinstance(n_workers, int) or n_workers < 1: | ||
raise ValueError( | ||
f"`n_workers` must be a positive integer. Instead got {n_workers}." | ||
) | ||
return self.sync(self._wait_for_workers, n_workers, timeout=timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simply do
self.scheduler_info = SchedulerInfo(await self._scheduler.identity())
This will allow this connection to be reused and handles the write/read part of this call. Specifically you will not need to manage the lifecycle of the comm yourself.
FWIW I think the same should be done with the
_watch_worker_status_comm
. I don't see a reason why these calls would deserve a dedicated connection. I assume_watch_worker_status_comm
was introduced before we had a connection poolThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @fjetter,
I get an attribute error when making this change and using a LocalCluster.
It was quite a while ago that I wrote this code, but I'm pretty sure the reason I went down the creating-a-comm route was because clusters don't (necessarily?) have a _scheduler attribute.
Am I missing something? Should there be a _scheduler attribute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, this should've been
self.scheduler_comm
.This should always be set if a cluster is already started. The implementation is a bit messy, though.
Cluster
doesn't actually set it butSpecCluster
does... regardless, having this set is a requirement since we're using this in many other places as well, i.e. it is safe to assume that subclasses implement this as wellThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That worked! I've removed all the messy comm stuff now :)
Thank you :)