Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

only set worker/nanny to Status.running if it is in Status.init #7773

Merged
merged 2 commits into from
Apr 26, 2023

Conversation

graingert
Copy link
Member

@graingert graingert commented Apr 13, 2023

Closes #7699

  • Tests added / passed
  • Passes pre-commit run --all-files

@@ -543,7 +543,8 @@ async def _close_on_failure(exc: Exception) -> None:
except Exception as exc:
await _close_on_failure(exc)
raise RuntimeError(f"{type(self).__name__} failed to start.") from exc
self.status = Status.running
if self.status == Status.init:
self.status = Status.running
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem in #7699 was that nanny.close() gets called during nanny.start() so close sets self.status = Status.closing but this line changed it back to self.status = Status.running which causes the _on_worker_exit to attempt a restart

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

@@ -1196,7 +1196,6 @@ async def _register_with_scheduler(self) -> None:
middle = (_start + _end) / 2
self._update_latency(_end - start)
self.scheduler_delay = response["time"] - middle
self.status = Status.running
Copy link
Member Author

@graingert graingert Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this extra self.status assignment is redundant because Server.start sets the self.status to Status.running, but for this assignment the batched_stream hasn't started yet and so _send_worker_status_change's message is dropped silently and so we were relying on Server.start doing an extra Worker.status transition of Status.running -> Status.running

edit: this is outdated because of the fix for distributed/tests/test_worker.py::test_worker_running_before_running_plugins https://github.com/dask/distributed/pull/7773/files#r1165702047

)

self.batched_stream.start(comm)
self.status = Status.running
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while this self.status = Status.running is redundant because Server.start sets it we need the status to be running before starting plugins and we need the self.batched_stream started before updating the status so _send_worker_status_change's message isn't silently dropped

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yuck...but good catch.

@github-actions
Copy link
Contributor

github-actions bot commented Apr 13, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       26 files  +       1         26 suites  +1   13h 55m 1s ⏱️ + 24m 2s
  3 550 tests ±       0    3 443 ✔️ +       6     106 💤 ±  0  1  - 6 
44 910 runs  +1 301  42 670 ✔️ +1 244  2 239 💤 +63  1  - 6 

For more details on these failures, see this check.

Results for commit 523df7a. ± Comparison against base commit 600b993.

♻️ This comment has been updated with latest results.

@graingert graingert marked this pull request as ready for review April 19, 2023 14:08
@graingert graingert requested a review from fjetter as a code owner April 19, 2023 14:08
Copy link
Contributor

@milesgranger milesgranger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🕵️

@@ -543,7 +543,8 @@ async def _close_on_failure(exc: Exception) -> None:
except Exception as exc:
await _close_on_failure(exc)
raise RuntimeError(f"{type(self).__name__} failed to start.") from exc
self.status = Status.running
if self.status == Status.init:
self.status = Status.running
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @graingert!

)

self.batched_stream.start(comm)
self.status = Status.running
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yuck...but good catch.

@hendrikmakait hendrikmakait merged commit 7477a63 into dask:main Apr 26, 2023
milesgranger pushed a commit to milesgranger/distributed that referenced this pull request May 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

distributed.cli.tests.test_dask_worker.test_signal_handling flaky on py3.11
3 participants