Skip to content

Commit

Permalink
only set worker/nanny to Status.running if it is in Status.init (#7773)
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert authored Apr 26, 2023
1 parent 76bbfaf commit 7477a63
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
3 changes: 2 additions & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ async def _close_on_failure(exc: Exception) -> None:
except Exception as exc:
await _close_on_failure(exc)
raise RuntimeError(f"{type(self).__name__} failed to start.") from exc
self.status = Status.running
if self.status == Status.init:
self.status = Status.running
return self

async def __aenter__(self):
Expand Down
21 changes: 11 additions & 10 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,7 +1196,6 @@ async def _register_with_scheduler(self) -> None:
middle = (_start + _end) / 2
self._update_latency(_end - start)
self.scheduler_delay = response["time"] - middle
self.status = Status.running
break
except OSError:
logger.info("Waiting to connect to: %26s", self.scheduler.address)
Expand All @@ -1207,18 +1206,20 @@ async def _register_with_scheduler(self) -> None:
msg = response["message"] if "message" in response else repr(response)
logger.error(f"Unable to connect to scheduler: {msg}")
raise ValueError(f"Unexpected response from register: {response!r}")
else:
await asyncio.gather(
*(
self.plugin_add(name=name, plugin=plugin)
for name, plugin in response["worker-plugins"].items()
)

self.batched_stream.start(comm)
self.status = Status.running

await asyncio.gather(
*(
self.plugin_add(name=name, plugin=plugin)
for name, plugin in response["worker-plugins"].items()
)
)

logger.info(" Registered to: %26s", self.scheduler.address)
logger.info("-" * 49)
logger.info(" Registered to: %26s", self.scheduler.address)
logger.info("-" * 49)

self.batched_stream.start(comm)
self.periodic_callbacks["keep-alive"].start()
self.periodic_callbacks["heartbeat"].start()
self.loop.add_callback(self.handle_scheduler, comm)
Expand Down

0 comments on commit 7477a63

Please sign in to comment.