Skip to content

Commit

Permalink
Remove support for UCX < 1.11.1 (#5859)
Browse files Browse the repository at this point in the history
  • Loading branch information
pentschev authored Feb 24, 2022
1 parent 94ebd57 commit 0e12374
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 77 deletions.
29 changes: 5 additions & 24 deletions distributed/comm/tests/test_ucx_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,68 +23,49 @@

@pytest.mark.asyncio
async def test_ucx_config(cleanup):
ucx_110 = ucp.get_ucx_version() >= (1, 10, 0)

ucx = {
"nvlink": True,
"infiniband": True,
"rdmacm": False,
"net-devices": "",
"tcp": True,
"cuda-copy": True,
}

with dask.config.set({"distributed.comm.ucx": ucx}):
ucx_config = _scrub_ucx_config()
if ucx_110:
assert ucx_config.get("TLS") == "rc,tcp,cuda_copy,cuda_ipc"
assert ucx_config.get("SOCKADDR_TLS_PRIORITY") == "tcp"
else:
assert ucx_config.get("TLS") == "rc,tcp,sockcm,cuda_copy,cuda_ipc"
assert ucx_config.get("SOCKADDR_TLS_PRIORITY") == "sockcm"
assert ucx_config.get("NET_DEVICES") is None
assert ucx_config.get("TLS") == "rc,tcp,cuda_copy,cuda_ipc"
assert ucx_config.get("SOCKADDR_TLS_PRIORITY") == "tcp"

ucx = {
"nvlink": False,
"infiniband": True,
"rdmacm": False,
"net-devices": "mlx5_0:1",
"tcp": True,
"cuda-copy": False,
}

with dask.config.set({"distributed.comm.ucx": ucx}):
ucx_config = _scrub_ucx_config()
if ucx_110:
assert ucx_config.get("TLS") == "rc,tcp"
assert ucx_config.get("SOCKADDR_TLS_PRIORITY") == "tcp"
else:
assert ucx_config.get("TLS") == "rc,tcp,sockcm"
assert ucx_config.get("SOCKADDR_TLS_PRIORITY") == "sockcm"
assert ucx_config.get("NET_DEVICES") == "mlx5_0:1"
assert ucx_config.get("TLS") == "rc,tcp"
assert ucx_config.get("SOCKADDR_TLS_PRIORITY") == "tcp"

ucx = {
"nvlink": False,
"infiniband": True,
"rdmacm": True,
"net-devices": "all",
"tcp": True,
"cuda-copy": True,
}

with dask.config.set({"distributed.comm.ucx": ucx}):
ucx_config = _scrub_ucx_config()
if ucx_110:
assert ucx_config.get("TLS") == "rc,tcp,cuda_copy"
else:
assert ucx_config.get("TLS") == "rc,tcp,rdmacm,cuda_copy"
assert ucx_config.get("TLS") == "rc,tcp,cuda_copy"
assert ucx_config.get("SOCKADDR_TLS_PRIORITY") == "rdmacm"

ucx = {
"nvlink": None,
"infiniband": None,
"rdmacm": None,
"net-devices": None,
"tcp": None,
"cuda-copy": None,
}
Expand Down
39 changes: 6 additions & 33 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,10 @@
if TYPE_CHECKING:
try:
import ucp
from ucp import create_endpoint as ucx_create_endpoint
from ucp import create_listener as ucx_create_listener
except ImportError:
pass
else:
ucp = None # type: ignore
ucx_create_endpoint = None # type: ignore
ucx_create_listener = None # type: ignore

host_array = None
device_array = None
Expand All @@ -58,7 +54,6 @@ def synchronize_stream(stream=0):

def init_once():
global ucp, host_array, device_array
global ucx_create_endpoint, ucx_create_listener
global pre_existing_cuda_context, cuda_context_created

if ucp is not None:
Expand Down Expand Up @@ -152,22 +147,6 @@ def device_array(n):
pool_allocator=True, managed_memory=False, initial_pool_size=pool_size
)

try:
from ucp.endpoint_reuse import EndpointReuse
except ImportError:
ucx_create_endpoint = ucp.create_endpoint
ucx_create_listener = ucp.create_listener
else:
reuse_endpoints = dask.config.get("distributed.comm.ucx.reuse-endpoints")
if (
reuse_endpoints is None and ucp.get_ucx_version() >= (1, 11, 0)
) or reuse_endpoints is False:
ucx_create_endpoint = ucp.create_endpoint
ucx_create_listener = ucp.create_listener
else:
ucx_create_endpoint = EndpointReuse.create_endpoint
ucx_create_listener = EndpointReuse.create_listener


def _close_comm(ref):
"""Callback to close Dask Comm when UCX Endpoint closes or errors
Expand Down Expand Up @@ -412,7 +391,7 @@ async def connect(self, address: str, deserialize=True, **connection_args) -> UC
ip, port = parse_host_port(address)
init_once()
try:
ep = await ucx_create_endpoint(ip, port)
ep = await ucp.create_endpoint(ip, port)
except (ucp.exceptions.UCXCloseError, ucp.exceptions.UCXCanceled,) + (
getattr(ucp.exceptions, "UCXConnectionReset", ()),
getattr(ucp.exceptions, "UCXNotConnected", ()),
Expand Down Expand Up @@ -476,7 +455,7 @@ async def serve_forever(client_ep):
await self.comm_handler(ucx)

init_once()
self.ucp_server = ucx_create_listener(serve_forever, port=self._input_port)
self.ucp_server = ucp.create_listener(serve_forever, port=self._input_port)

def stop(self):
self.ucp_server = None
Expand Down Expand Up @@ -545,9 +524,7 @@ def _scrub_ucx_config():
# 2) explicitly defined UCX configuration flags

# import does not initialize ucp -- this will occur outside this function
from ucp import get_config, get_ucx_version

ucx_110 = get_ucx_version() >= (1, 10, 0)
from ucp import get_config

options = {}

Expand All @@ -562,11 +539,11 @@ def _scrub_ucx_config():
]
):
if dask.config.get("distributed.comm.ucx.rdmacm"):
tls = "tcp" if ucx_110 else "tcp,rdmacm"
tls = "tcp"
tls_priority = "rdmacm"
else:
tls = "tcp" if ucx_110 else "tcp,sockcm"
tls_priority = "tcp" if ucx_110 else "sockcm"
tls = "tcp"
tls_priority = "tcp"

# CUDA COPY can optionally be used with ucx -- we rely on the user
# to define when messages will include CUDA objects. Note:
Expand All @@ -586,10 +563,6 @@ def _scrub_ucx_config():

options = {"TLS": tls, "SOCKADDR_TLS_PRIORITY": tls_priority}

net_devices = dask.config.get("distributed.comm.ucx.net-devices")
if net_devices is not None and net_devices != "":
options["NET_DEVICES"] = net_devices

# ANY UCX options defined in config will overwrite high level dask.ucx flags
valid_ucx_vars = list(get_config().keys())
for k, v in options.items():
Expand Down
18 changes: 0 additions & 18 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -863,24 +863,6 @@ properties:
description: |
Set environment variables to enable UCX RDMA connection manager support,
requires ``distributed.comm.ucx.infiniband=True``.
net-devices:
type: [string, 'null']
description: |
Interface(s) used by workers for UCX communication. Can be a string (like
``"eth0"`` for NVLink or ``"mlx5_0:1"``/``"ib0"`` for InfiniBand), ``"auto"``
(requires ``distributed.comm.ucx.infiniband=True``) to pick the optimal interface per-worker based on
the system's topology, or ``None`` to stay with the default value of ``"all"`` (use
all available interfaces). Setting to ``"auto"`` requires UCX-Py to be installed
and compiled with hwloc support. Unexpected errors can occur when using
``"auto"`` if any interfaces are disconnected or improperly configured.
reuse-endpoints:
type: [boolean, 'null']
description: |
Enable UCX-Py reuse endpoints mechanism if ``True`` or if it's not specified and
UCX < 1.11 is installed, otherwise disable reuse endpoints. This was primarily
introduced to resolve an issue with CUDA IPC that has been fixed in UCX 1.10, but
can cause establishing endpoints to be very slow, this is particularly noticeable in
clusters of more than a few dozen workers.
create-cuda-context:
type: [boolean, 'null']
description: |
Expand Down
2 changes: 0 additions & 2 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ distributed:
nvlink: null # enable cuda_ipc
infiniband: null # enable Infiniband
rdmacm: null # enable RDMACM
net-devices: null # define what interface to use for UCX comm
reuse-endpoints: null # enable endpoint reuse
create-cuda-context: null # create CUDA context before UCX initialization

zstd:
Expand Down

0 comments on commit 0e12374

Please sign in to comment.