diff --git a/distributed/client.py b/distributed/client.py index b21a0504352..ef14a39b0cb 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -60,6 +60,7 @@ CommClosedError, ConnectionPool, PooledRPCCall, + Status, clean_exception, connect, rpc, @@ -825,6 +826,11 @@ def __init__( elif isinstance(getattr(address, "scheduler_address", None), str): # It's a LocalCluster or LocalCluster-compatible object self.cluster = address + status = getattr(self.cluster, "status") + if status and status in [Status.closed, Status.closing]: + raise RuntimeError( + f"Trying to connect to an already closed or closing Cluster {self.cluster}." + ) with suppress(AttributeError): loop = address.loop if security is None: diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index ae1a96779b1..1d6cd954be7 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -1149,3 +1149,18 @@ async def test_cluster_host_used_throughout_cluster(host, use_nanny): if use_nanny: url = urlparse(worker.process.worker_address) assert url.hostname == "127.0.0.1" + + +@gen_test() +async def test_connect_to_closed_cluster(cleanup): + async with LocalCluster(processes=False, asynchronous=True) as cluster: + async with Client(cluster, asynchronous=True) as c1: + assert await c1.submit(inc, 1) == 2 + + with pytest.raises( + RuntimeError, + match="Trying to connect to an already closed or closing Cluster", + ): + # Raises during init without actually connecting since we're not + # awaiting anything + Client(cluster, asynchronous=True)