diff --git a/distributed/core.py b/distributed/core.py index 1c94352d205..40372cb8fa8 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -543,7 +543,8 @@ async def _close_on_failure(exc: Exception) -> None: except Exception as exc: await _close_on_failure(exc) raise RuntimeError(f"{type(self).__name__} failed to start.") from exc - self.status = Status.running + if self.status == Status.init: + self.status = Status.running return self async def __aenter__(self): diff --git a/distributed/worker.py b/distributed/worker.py index f443237c6c9..8fe333dfb58 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1196,7 +1196,6 @@ async def _register_with_scheduler(self) -> None: middle = (_start + _end) / 2 self._update_latency(_end - start) self.scheduler_delay = response["time"] - middle - self.status = Status.running break except OSError: logger.info("Waiting to connect to: %26s", self.scheduler.address) @@ -1207,18 +1206,20 @@ async def _register_with_scheduler(self) -> None: msg = response["message"] if "message" in response else repr(response) logger.error(f"Unable to connect to scheduler: {msg}") raise ValueError(f"Unexpected response from register: {response!r}") - else: - await asyncio.gather( - *( - self.plugin_add(name=name, plugin=plugin) - for name, plugin in response["worker-plugins"].items() - ) + + self.batched_stream.start(comm) + self.status = Status.running + + await asyncio.gather( + *( + self.plugin_add(name=name, plugin=plugin) + for name, plugin in response["worker-plugins"].items() ) + ) - logger.info(" Registered to: %26s", self.scheduler.address) - logger.info("-" * 49) + logger.info(" Registered to: %26s", self.scheduler.address) + logger.info("-" * 49) - self.batched_stream.start(comm) self.periodic_callbacks["keep-alive"].start() self.periodic_callbacks["heartbeat"].start() self.loop.add_callback(self.handle_scheduler, comm)