-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cluster wait #6700
Cluster wait #6700
Conversation
Can one of the admins verify this patch? |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 26 files ± 0 26 suites ±0 12h 50m 9s ⏱️ - 8m 54s For more details on these failures and errors, see this check. Results for commit c95aafd. ± Comparison against base commit 1baa5ff. ♻️ This comment has been updated with latest results. |
add to allowlist |
@quasiben Sorry, what does this mean? Also, I've got 3 failing tests in test_ssh. Is it possible to run these locally? They just fail for me, saying cluster failed to start. This happens on main branch as well. I'm not really sure how to debug them otherwise... |
@idorrington92 the comment allows us to run your PR against a GPU CI system: Distributed has been flaky in the past and recently there has been a concerted effort to stabilize the CI test: As you can see in the linked report, I would suggest @jacobtomlinson review this PR but he is currently at SciPy (as are other dask maintainers) so it may be a little bit before you hear back |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many thanks for raising this!
…rkers is not implemented
…rge didn't remove it from mine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the review feedback.
It looks like the linter isn't happy. If you haven't installed pre-commit
you'll want to do that.
Also it looks like some of the test failures might be related, especially this one.
_____________________ test_ssh_nprocs_renamed_to_n_workers _____________________
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:130: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az302-754.15681.346903' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
@gen_test()
asyncdeftest_ssh_nprocs_renamed_to_n_workers():
with pytest.warns(FutureWarning, match="renamed to n_workers"):
asyncwith SSHCluster(
["127.0.0.1"] * 3,
connect_options=dict(known_hosts=None),
asynchronous=True,
scheduler_options={"idle_timeout": "5s"},
worker_options={"death_timeout": "5s", "nprocs": 2},
) as cluster:
assertlen(cluster.workers) == 2
asyncwith Client(cluster, asynchronous=True) as client:
> await client.wait_for_workers(4)
distributed/deploy/tests/test_ssh.py:117:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = SSHCluster(SSHCluster, 'tcp://10.1.0.103:42191', workers=0, threads=0, memory=0 B)
n_workers = 4, timeout = None
asyncdef_wait_for_workers(self, n_workers=0, timeout=None):
> info = self.scheduler.identity()
E AttributeError: 'Scheduler' object has no attribute 'identity'
distributed/deploy/cluster.py:534: AttributeError
Could you take a look?
Doesn't it say listing/pre-commit hooks passed ok? I agree that test failure is related but I can't run it locally as I get an error saying cluster failed to start. Is there something I need to do in order to run it locally? Otherwise the only thing I've got to go on is the automated tests here... |
Apologies I was looking at Can you run |
Thanks @jacobtomlinson, that fixed the error I was getting. I would have taken ages to think of that. I had a little look at the bug that was coming up in the tests, but it's not obvious to me why it's happening. I'll have proper look later in the week. |
…e using a cluster
…viour of client.wait_for_workers
Hi @idorrington92 sorry that this hasn't gotten much love. I suspect that most folks are out during the holidays. I'm holding down the fort for the moment. If I were reviewing this initially I probably would have said "the try-except logic around My preference is to drop the implementation (even if this means dropping the test), and just stick with the try-except logic that you have in Do you have any thoughts on the above? Is there a solid reason to implement |
Said a different way, I'm more than happy to merge the client.py changes immediately. I'd want to look over the other changes more thoroughly, which would take me some time. Or somoene like @jacobtomlinson who has already looked over them could merge if he's feeling confident. |
distributed/deploy/cluster.py
Outdated
await self._scheduler_info_comm.write({"op": "identity"}) | ||
self.scheduler_info = SchedulerInfo(await self._scheduler_info_comm.read()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simply do
self.scheduler_info = SchedulerInfo(await self._scheduler.identity())
This will allow this connection to be reused and handles the write/read part of this call. Specifically you will not need to manage the lifecycle of the comm yourself.
FWIW I think the same should be done with the _watch_worker_status_comm
. I don't see a reason why these calls would deserve a dedicated connection. I assume _watch_worker_status_comm
was introduced before we had a connection pool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @fjetter,
I get an attribute error when making this change and using a LocalCluster.
It was quite a while ago that I wrote this code, but I'm pretty sure the reason I went down the creating-a-comm route was because clusters don't (necessarily?) have a _scheduler attribute.
Am I missing something? Should there be a _scheduler attribute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, this should've been self.scheduler_comm
.
This should always be set if a cluster is already started. The implementation is a bit messy, though. Cluster
doesn't actually set it but SpecCluster
does... regardless, having this set is a requirement since we're using this in many other places as well, i.e. it is safe to assume that subclasses implement this as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That worked! I've removed all the messy comm stuff now :)
Thank you :)
distributed/client.py
Outdated
# Most likely, either self.cluster is None, or the cluster has not | ||
# implemented a wait_for_workers method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You added wait_for_workers to the base class, i.e. every cluster will come equipped with this. I typically prefer dealing with these situations by being explicit, i.e. if cluster is None; ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, changed it to an if-else
Hi @mrocklin, FYI - Following the comment from @fjetter above, I've replaced that try-catch with an if-else, so that will need to be reverted if we do go down that route. I'm happy either way, I learned a lot from working on this issue, and would rather remove most of my changes and contribute something useful than add a load of code that'll cause problems later on :) I think it's better if people with more user knowledge than me decide which route we go down though :) |
Just pinging again to see if we can merge this now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Could you fix up the merge conflicts?
I've handled the merge conflicts. I'm getting some failing tests, but I'm seeing these test failures in other PRs as well so don't think it's to do with my changes... |
It looks like |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CI failures seem to be happening consistently with Python 3.8 only.
I had a skim over the code and can't see anything that sticks out as being a syntax problem but maybe you could test locally with 3.8 to find the problem?
self.scheduler_info = SchedulerInfo(await self.scheduler_comm.identity()) | ||
|
||
def wait_for_workers( | ||
self, n_workers: int | str = no_default, timeout: float | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing unions with a pipe like this is supported in Python >=3.10. Not sure if that's related to the issues we are seeing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed it's 3.8. I do get same error locally. Changing (or removing) type hints doesn't fix it.
It's a thread leaking issue, which sounds much lower level that anything I've done here, but I'll keep looking in to it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've managed to fix it. I noticed the other tests in test_cluster used "async with" to start the cluster, and didn't pass in the loop fixture, so I copied that. I'd be lying if I said I knew what went wrong and how this fixed it though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this up. CI failures now look unrelated.
Thanks for pushing this along, sorry that it's taken so long to get in.
* Moving wait_for_worker logic to cluster, and having client call that if it can * Adding test for cluster.wait_for_workers * use try and except to catch case where cluster is none or wait_for_workers is not implemented * linting * This test has been removed on main branch, but for some reason git merge didn't remove it from mine * Cluster has to use scheduler_info attribute instead of scheduler.identity * lint * reverting * need to use cluster.scale when client.wait_for_workers is called while using a cluster * need to use scheduler_info. Also, using cluster.scale to emulate behaviour of client.wait_for_workers * using scheduler_info and dont need to call scale anymore * lint * adding gen_test decorator * Don't think we need to scale at start of wait_for_workers * self.scheduler_info does not update worker status from init to running, so need to request status again * Use Status * Scale was fixing the nworkers test because it forced the worker status to update. Now that worker status is checked we don't need this (and shouldn't have really included it anyway) * Refactoring * Fixing type information * Experimenting with creating new comm * Create separate comm in _start and use that to update scheduler_info * Close new comm * initialise scheduler_info_comm * Don't allow n_workers to be zero for cluster wait_for_workers * Adding return type * Change try-catch to be an explicit if-else * Check explicitly for cluster is none, as I think it's clearer * linting * use scheduler_comm instead of opening new comm * remove update_scheduler_info method * pre-commit changes * Reduce number of works to see if it fixes github tests * Changing test to make it work in python 3.8
This was a breaking change for dask-gateway's client i think, because it assumes that its available instead of opting in to it if it is. Can we complement this PR with a conditional check to verify the new function is available or similar? Possibly emitting a warning or similar? If this is confirmed to being a resonable call, i can open a PR but since I'm a novice in this repo's code base it would be good to have a signal it could make sense at all. |
Closes #6346
pre-commit run --all-files