diff --git a/.gitlab/test_cpp.sh b/.gitlab/test_cpp.sh index 7c4017a38..b5984edae 100755 --- a/.gitlab/test_cpp.sh +++ b/.gitlab/test_cpp.sh @@ -68,10 +68,6 @@ if $TEST_LIBFABRIC ; then fi ./bin/nixl_etcd_example ./bin/ucx_backend_test -# Skip UCX_MO backend test on GPU worker, fails VRAM transfers -if ! $HAS_GPU ; then - ./bin/ucx_mo_backend_test -fi mkdir -p /tmp/telemetry_test NIXL_TELEMETRY_ENABLE=y NIXL_TELEMETRY_DIR=/tmp/telemetry_test ./bin/agent_example & sleep 1 diff --git a/benchmark/kvbench/README.md b/benchmark/kvbench/README.md index d8214de56..fb8991b89 100644 --- a/benchmark/kvbench/README.md +++ b/benchmark/kvbench/README.md @@ -118,7 +118,7 @@ These arguments are used by both `plan` and `profile` commands: | -------- | ----------- | | `--source` | Source of the nixl descriptors [file, memory, gpu] (default: file) | | `--destination` | Destination of the nixl descriptors [file, memory, gpu] (default: memory) | -| `--backend` | Communication backend [UCX, UCX_MO, GDS, GDS_MT, POSIX, GPUNETIO, Mooncake, HF3FS, OBJ] (default: UCX) | +| `--backend` | Communication backend [UCX, GDS, GDS_MT, POSIX, GPUNETIO, Mooncake, HF3FS, OBJ] (default: UCX) | | `--worker_type` | Worker to use to transfer data [nixl, nvshmem] (default: nixl) | | `--initiator_seg_type` | Memory segment type for initiator [DRAM, VRAM] (default: DRAM) | | `--target_seg_type` | Memory segment type for target [DRAM, VRAM] (default: DRAM) | diff --git a/benchmark/kvbench/commands/args.py b/benchmark/kvbench/commands/args.py index 11acaee31..47e8786f7 100644 --- a/benchmark/kvbench/commands/args.py +++ b/benchmark/kvbench/commands/args.py @@ -72,7 +72,7 @@ def nixl_bench_args(func): func = click.option( "--backend", type=str, - help="Communication backend [UCX, UCX_MO, GDS, GDS_MT, POSIX, GPUNETIO, Mooncake, HF3FS, OBJ] (default: UCX)", + help="Communication backend [UCX, GDS, GDS_MT, POSIX, GPUNETIO, Mooncake, HF3FS, OBJ] (default: UCX)", )(func) func = click.option( "--worker_type", diff --git a/benchmark/kvbench/commands/nixlbench.py b/benchmark/kvbench/commands/nixlbench.py index bbfecf012..72ec210f9 100644 --- a/benchmark/kvbench/commands/nixlbench.py +++ b/benchmark/kvbench/commands/nixlbench.py @@ -205,7 +205,7 @@ def _configure_posix(self, source: str, destination: str): raise ValueError(f"Invalid source for POSIX/HF3FS: {source}") def _configure_ucx(self, backend: str, source: str, destination: str): - """Configure UCX, UCX_MO, GPUNETIO, and Mooncake plugins (same logic for all)""" + """Configure UCX, GPUNETIO, and Mooncake plugins (same logic for all)""" arg_to_seg_type = { "memory": "DRAM", "gpu": "VRAM", @@ -241,7 +241,7 @@ def configure_segment_type(self, backend: str, source: str, destination: str): self._configure_gds(source, destination) elif backend_lower in ["posix", "hf3fs"]: self._configure_posix(source, destination) - elif backend_lower in ["ucx", "ucx_mo", "gpunetio", "mooncake"]: + elif backend_lower in ["ucx", "gpunetio", "mooncake"]: self._configure_ucx(backend_lower, source, destination) elif backend_lower == "obj": self._configure_obj(source, destination) diff --git a/benchmark/nixlbench/README.md b/benchmark/nixlbench/README.md index 153374e84..fb37927c0 100644 --- a/benchmark/nixlbench/README.md +++ b/benchmark/nixlbench/README.md @@ -32,7 +32,7 @@ A comprehensive benchmarking tool for the NVIDIA Inference Xfer Library (NIXL) t ## Features -- **Multiple Communication Backends**: UCX, UCX_MO, GPUNETIO, Mooncake, Libfabric for network communication +- **Multiple Communication Backends**: UCX, GPUNETIO, Mooncake, Libfabric for network communication - **Storage Backend Support**: GDS, GDS_MT, POSIX, HF3FS, OBJ (S3), GUSLI for storage operations - **Flexible Communication Patterns**: - **Pairwise**: Point-to-point communication between pairs @@ -420,7 +420,7 @@ sudo systemctl start etcd && sudo systemctl enable etcd ``` --runtime_type NAME # Type of runtime to use [ETCD] (default: ETCD) --worker_type NAME # Worker to use to transfer data [nixl, nvshmem] (default: nixl) ---backend NAME # Communication backend [UCX, UCX_MO, GDS, GDS_MT, POSIX, GPUNETIO, Mooncake, HF3FS, OBJ, GUSLI] (default: UCX) +--backend NAME # Communication backend [UCX, GDS, GDS_MT, POSIX, GPUNETIO, Mooncake, HF3FS, OBJ, GUSLI] (default: UCX) --benchmark_group NAME # Name of benchmark group for parallel runs (default: default) --etcd_endpoints URL # ETCD server URL for coordination (default: http://localhost:2379) ``` @@ -520,7 +520,7 @@ Note: storage_enable_direct is automatically enabled for GUSLI backend NIXL Benchmark uses an ETCD key-value store for coordination between benchmark workers. This is useful in containerized or cloud-native environments. **ETCD Requirements:** -- **Required**: Network backends (UCX, UCX_MO, GPUNETIO, Mooncake, Libfabric) and multi-node setups +- **Required**: Network backends (UCX, GPUNETIO, Mooncake, Libfabric) and multi-node setups - **Optional**: Storage backends (GDS, GDS_MT, POSIX, HF3FS, OBJ, GUSLI) running as single instances - **Required**: Storage backends when `--etcd_endpoints` is explicitly specified @@ -565,9 +565,6 @@ The workers automatically coordinate ranks through ETCD as they connect. # UCX with specific devices ./nixlbench --etcd_endpoints http://etcd-server:2379 --backend UCX --device_list mlx5_0,mlx5_1 - -# UCX Memory-Only variant -./nixlbench --etcd_endpoints http://etcd-server:2379 --backend UCX_MO ``` **GPUNETIO Backend:** diff --git a/benchmark/nixlbench/src/utils/utils.cpp b/benchmark/nixlbench/src/utils/utils.cpp index 81ac7e0ed..7b7cc46c5 100644 --- a/benchmark/nixlbench/src/utils/utils.cpp +++ b/benchmark/nixlbench/src/utils/utils.cpp @@ -47,7 +47,7 @@ DEFINE_string(worker_type, XFERBENCH_WORKER_NIXL, "Type of worker [nixl, nvshmem DEFINE_string( backend, XFERBENCH_BACKEND_UCX, - "Name of NIXL backend [UCX, UCX_MO, GDS, GDS_MT, POSIX, GPUNETIO, Mooncake, HF3FS, OBJ, GUSLI] \ + "Name of NIXL backend [UCX, GDS, GDS_MT, POSIX, GPUNETIO, Mooncake, HF3FS, OBJ, GUSLI] \ (only used with nixl worker)"); DEFINE_string(initiator_seg_type, XFERBENCH_SEG_TYPE_DRAM, "Type of memory segment for initiator \ [DRAM, VRAM]. Note: Storage backends always use DRAM locally."); @@ -431,8 +431,7 @@ xferBenchConfig::printConfig() { } printOption("Worker type (--worker_type=[nixl,nvshmem])", worker_type); if (worker_type == XFERBENCH_WORKER_NIXL) { - printOption("Backend (--backend=[UCX,UCX_MO,GDS,GDS_MT,POSIX,Mooncake,HF3FS,OBJ])", - backend); + printOption("Backend (--backend=[UCX,GDS,GDS_MT,POSIX,Mooncake,HF3FS,OBJ])", backend); printOption ("Enable pt (--enable_pt=[0,1])", std::to_string (enable_pt)); printOption("Progress threads (--progress_threads=N)", std::to_string(progress_threads)); printOption ("Device list (--device_list=dev1,dev2,...)", device_list); diff --git a/benchmark/nixlbench/src/utils/utils.h b/benchmark/nixlbench/src/utils/utils.h index d478878ba..8cd3c7f7e 100644 --- a/benchmark/nixlbench/src/utils/utils.h +++ b/benchmark/nixlbench/src/utils/utils.h @@ -66,7 +66,6 @@ // Backend types #define XFERBENCH_BACKEND_UCX "UCX" -#define XFERBENCH_BACKEND_UCX_MO "UCX_MO" #define XFERBENCH_BACKEND_LIBFABRIC "LIBFABRIC" #define XFERBENCH_BACKEND_GDS "GDS" #define XFERBENCH_BACKEND_GDS_MT "GDS_MT" diff --git a/benchmark/nixlbench/src/worker/nixl/nixl_worker.cpp b/benchmark/nixlbench/src/worker/nixl/nixl_worker.cpp index 351879259..76f6db00f 100644 --- a/benchmark/nixlbench/src/worker/nixl/nixl_worker.cpp +++ b/benchmark/nixlbench/src/worker/nixl/nixl_worker.cpp @@ -113,7 +113,6 @@ xferBenchNixlWorker::xferBenchNixlWorker(int *argc, char ***argv, std::vectorgetAvailPlugins(plugins); if (0 == xferBenchConfig::backend.compare(XFERBENCH_BACKEND_UCX) || - 0 == xferBenchConfig::backend.compare(XFERBENCH_BACKEND_UCX_MO) || 0 == xferBenchConfig::backend.compare(XFERBENCH_BACKEND_LIBFABRIC) || 0 == xferBenchConfig::backend.compare(XFERBENCH_BACKEND_GPUNETIO) || 0 == xferBenchConfig::backend.compare(XFERBENCH_BACKEND_MOONCAKE) || @@ -126,8 +125,7 @@ xferBenchNixlWorker::xferBenchNixlWorker(int *argc, char ***argv, std::vectorgetPluginParams(backend_name, mems, backend_params); - if (0 == xferBenchConfig::backend.compare(XFERBENCH_BACKEND_UCX) || - 0 == xferBenchConfig::backend.compare(XFERBENCH_BACKEND_UCX_MO)) { + if (0 == xferBenchConfig::backend.compare(XFERBENCH_BACKEND_UCX)) { backend_params["num_threads"] = std::to_string(xferBenchConfig::progress_threads); // No need to set device_list if all is specified @@ -135,14 +133,8 @@ xferBenchNixlWorker::xferBenchNixlWorker(int *argc, char ***argv, std::vector= 1) { if (isInitiator()) { backend_params["device_list"] = devices[rank]; - if (0 == xferBenchConfig::backend.compare(XFERBENCH_BACKEND_UCX_MO)) { - backend_params["num_ucx_engines"] = xferBenchConfig::num_initiator_dev; - } } else { backend_params["device_list"] = devices[rank - xferBenchConfig::num_initiator_dev]; - if (0 == xferBenchConfig::backend.compare(XFERBENCH_BACKEND_UCX_MO)) { - backend_params["num_ucx_engines"] = xferBenchConfig::num_target_dev; - } } } diff --git a/src/core/meson.build b/src/core/meson.build index d00fd7362..54e6344d2 100644 --- a/src/core/meson.build +++ b/src/core/meson.build @@ -29,10 +29,6 @@ if 'UCX' in static_plugins nixl_lib_deps += [ ucx_backend_interface, asio_dep, cuda_dep ] endif -if 'UCX_MO' in static_plugins - nixl_lib_deps += [ ucx_mo_backend_interface, cuda_dep ] -endif - if 'POSIX' in static_plugins nixl_lib_deps += [ posix_backend_interface ] endif diff --git a/src/core/nixl_plugin_manager.cpp b/src/core/nixl_plugin_manager.cpp index 85ad98390..fe11531a3 100644 --- a/src/core/nixl_plugin_manager.cpp +++ b/src/core/nixl_plugin_manager.cpp @@ -403,10 +403,6 @@ void nixlPluginManager::registerBuiltinPlugins() { NIXL_REGISTER_STATIC_PLUGIN(UCX) #endif -#ifdef STATIC_PLUGIN_UCX_MO - NIXL_REGISTER_STATIC_PLUGIN(UCX_MO) -#endif - #ifdef STATIC_PLUGIN_GDS #ifndef DISABLE_GDS_BACKEND NIXL_REGISTER_STATIC_PLUGIN(GDS) diff --git a/src/plugins/meson.build b/src/plugins/meson.build index 2a97127b4..c7e61ef1e 100644 --- a/src/plugins/meson.build +++ b/src/plugins/meson.build @@ -13,11 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -ucx_backend_inc_dirs = include_directories('./ucx') - if ucx_dep.found() subdir('ucx') - subdir('ucx_mo') endif subdir('posix') # Always try to build POSIX backend, it will handle its own dependencies diff --git a/src/plugins/ucx_mo/meson.build b/src/plugins/ucx_mo/meson.build deleted file mode 100644 index 6b4563ea2..000000000 --- a/src/plugins/ucx_mo/meson.build +++ /dev/null @@ -1,54 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -ucx_mo_utils_dep = declare_dependency(link_with: [ucx_utils_lib, ucx_backend_lib], - include_directories: [utils_inc_dirs, ucx_backend_inc_dirs] ) - -compile_flags = [] -if cuda_dep.found() - compile_flags = [ '-DHAVE_CUDA' ] -endif - -if 'UCX_MO' in static_plugins - ucx_mo_backend_lib = static_library('UCX_MO', - 'ucx_mo_backend.cpp', 'ucx_mo_backend.h', 'ucx_mo_plugin.cpp', - dependencies: [nixl_infra, nixl_common_deps, ucx_mo_utils_dep, serdes_interface, cuda_dep, ucx_dep], - link_with: [ucx_backend_lib], - include_directories: [nixl_inc_dirs, utils_inc_dirs, ucx_backend_inc_dirs, ucx_utils_inc_dirs], - install: false, - cpp_args : compile_flags, - name_prefix: 'libplugin_') # Custom prefix for plugin libraries -else - ucx_mo_backend_lib = shared_library('UCX_MO', - 'ucx_mo_backend.cpp', 'ucx_mo_backend.h', 'ucx_mo_plugin.cpp', - dependencies: [nixl_infra, nixl_common_deps, ucx_mo_utils_dep, serdes_interface, cuda_dep, ucx_dep], - link_with: [ucx_backend_lib], - include_directories: [nixl_inc_dirs, utils_inc_dirs, ucx_backend_inc_dirs, ucx_utils_inc_dirs], - install: true, - cpp_args : compile_flags + ['-fPIC'], - name_prefix: 'libplugin_', # Custom prefix for plugin libraries - install_dir: plugin_install_dir, - # FIXME: normally plugins should not depend directly on each other - install_rpath: '$ORIGIN:$ORIGIN/..') - - if get_option('buildtype') == 'debug' - run_command('sh', '-c', - 'echo "UCX_MO=' + ucx_mo_backend_lib.full_path() + '" >> ' + plugin_build_dir + '/pluginlist', - check: true - ) - endif -endif - -ucx_mo_backend_interface = declare_dependency(link_with: ucx_mo_backend_lib) diff --git a/src/plugins/ucx_mo/ucx_mo_backend.cpp b/src/plugins/ucx_mo/ucx_mo_backend.cpp deleted file mode 100644 index c3d3c1368..000000000 --- a/src/plugins/ucx_mo/ucx_mo_backend.cpp +++ /dev/null @@ -1,771 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include - - -// Local includes -#include -#include -#include -#include - -using namespace std; - -/**************************************** - * CUDA related code -*****************************************/ - -#ifdef HAVE_CUDA - -#include - -static uint32_t _getNumVramDevices() -{ - cudaError_t result; - int n_vram_dev; - result = cudaGetDeviceCount(&n_vram_dev); - if (result != cudaSuccess) { - return 0; - } else { - return n_vram_dev; - } -} - - -#else - -static uint32_t _getNumVramDevices(){ - return 0; -} - -#endif - -/**************************************** - * UCX/MO Request management -*****************************************/ - -class nixlUcxMoRequestH : public nixlBackendReqH { -private: - - class dlMatrixElem { - public: - bool in_use, in_progress; - nixl_meta_dlist_t *ldescs; - nixl_meta_dlist_t *rdescs; - nixlBackendReqH *ucx_req; - - dlMatrixElem() { - in_use = false; - in_progress = false; - ldescs = nullptr; - rdescs = nullptr; - ucx_req = nullptr; - } - }; - - using dl_matrix_t = std::vector>; - - dl_matrix_t dlMatrix; - - std::string remoteAgent; - bool notifNeed; - std::string notifMsg; -public: - nixlUcxMoRequestH(size_t l_eng_cnt, size_t r_eng_cnt) : - dlMatrix(l_eng_cnt, std::vector(r_eng_cnt, dlMatrixElem())) - { - notifNeed = false; - } - - ~nixlUcxMoRequestH() - { - for (auto &row : dlMatrix) { - for (auto &p : row) { - if (p.ldescs) { - delete p.ldescs; - } - if (p.rdescs) { - delete p.rdescs; - } - } - } - } - - friend class nixlUcxMoEngine; -}; - - -/**************************************** - * UCX Engine management -*****************************************/ - - -int -nixlUcxMoEngine::setEngCnt(uint32_t num_host) -{ - _gpuCnt = _getNumVramDevices(); - _engineCnt = (_gpuCnt > num_host) ? _gpuCnt : num_host; - return 0; -} - -uint32_t -nixlUcxMoEngine::getEngCnt() -{ - return _engineCnt; -} - -int32_t -nixlUcxMoEngine::getEngIdx(nixl_mem_t type, uint64_t devId) -{ - switch (type) { - case VRAM_SEG: - assert(devId < _gpuCnt); - if (!(devId < _gpuCnt)) { - return -1; - } - case DRAM_SEG: - break; - default: - return -1; - } - assert(devId < _engineCnt); - return (devId < _engineCnt) ? devId : -1; -} - -string -nixlUcxMoEngine::getEngName(const string &baseName, uint32_t eidx) const -{ - return baseName + ":" + to_string(eidx); -} - -string -nixlUcxMoEngine::getEngBase(const string &engName) -{ - // find the last occurrence (agent name may have colon in its name) - if (string::npos == engName.find_last_of(":")) { - assert(engName.find_last_of(":") != string::npos); - return engName; - } - return engName.substr(0, engName.find_last_of(":")); -} - -/**************************************** - * Constructor/Destructor -*****************************************/ - -nixlUcxMoEngine::nixlUcxMoEngine(const nixlBackendInitParams* init_params): - nixlBackendEngine(init_params) -{ - nixl_b_params_t* custom_params = init_params->customParams; - uint32_t num_ucx_engines = 1; - if (custom_params->count("num_ucx_engines")) { - const char *cptr = (*custom_params)["num_ucx_engines"].c_str(); - char *eptr; - uint32_t tmp = strtoul(cptr, &eptr, 0); - if ( (size_t)(eptr - cptr) == (*custom_params)["num_ucx_engines"].length()) { - num_ucx_engines = tmp; - } else { - this->initErr = true; - // TODO: Log error - return; - } - } - - setEngCnt(num_ucx_engines); - // Initialize required number of engines - for (uint32_t i = 0; i < getEngCnt(); i++) { - auto e = nixlUcxEngine::create(*init_params); - if (e->getInitErr()) { - this->initErr = true; - // TODO: Log error - return; - } - engines.push_back(std::move(e)); - } -} - -nixl_mem_list_t -nixlUcxMoEngine::getSupportedMems () const { - nixl_mem_list_t mems; - mems.push_back(DRAM_SEG); - mems.push_back(VRAM_SEG); - return mems; -} - -/**************************************** - * Connection management -*****************************************/ - -nixl_status_t -nixlUcxMoEngine::getConnInfo(std::string &str) const -{ - nixlSerDes sd; - nixl_status_t status; - - // Serialize the number of engines - size_t sz = engines.size(); - sd.addBuf("Count", &sz, sizeof(sz)); - - for( auto &e : engines ) { - string s; - status = e->getConnInfo(s); - if (NIXL_SUCCESS != status) { - return status; - } - sd.addStr("Value", s); - } - - str = sd.exportStr(); - return NIXL_SUCCESS; -} - - -nixl_status_t -nixlUcxMoEngine::loadRemoteConnInfo (const string &remote_agent, - const string &remote_conn_info) -{ - nixlSerDes sd; - nixlUcxMoConnection conn; - nixl_status_t status; - size_t sz; - remote_comm_it_t it = remoteConnMap.find(remote_agent); - - if(it != remoteConnMap.end()) { - return NIXL_ERR_INVALID_PARAM; - } - - conn.remoteAgent = remote_agent; - - status = sd.importStr(remote_conn_info); - if (status != NIXL_SUCCESS) { - return status; - } - - ssize_t ret = sd.getBufLen("Count"); - if (ret != sizeof(sz)) { - return NIXL_ERR_MISMATCH; - } - status = sd.getBuf("Count", &sz, ret); - if (status != NIXL_SUCCESS) { - return status; - } - - conn.num_engines = sz; - - for(size_t idx = 0; idx < sz; idx++) { - string cinfo; - cinfo = sd.getStr("Value"); - for (auto &e : engines) { - status = e->loadRemoteConnInfo(getEngName(remote_agent, idx), cinfo); - if (status != NIXL_SUCCESS) { - return status; - } - } - } - - remoteConnMap[remote_agent] = conn; - - return NIXL_SUCCESS; -} - -nixl_status_t -nixlUcxMoEngine::connect(const string &remote_agent) -{ - remote_comm_it_t it = remoteConnMap.find(remote_agent); - nixl_status_t status; - - if(it == remoteConnMap.end()) { - return NIXL_ERR_NOT_FOUND; - } - - nixlUcxMoConnection &conn = it->second; - - for (auto &e : engines) { - for (uint32_t idx = 0; idx < conn.num_engines; idx++) { - status = e->connect(getEngName(remote_agent, idx)); - if (status != NIXL_SUCCESS) { - return status; - } - } - } - - return NIXL_SUCCESS; -} - -nixl_status_t -nixlUcxMoEngine::disconnect(const string &remote_agent) -{ - nixl_status_t status; - remote_comm_it_t it = remoteConnMap.find(remote_agent); - - if(it == remoteConnMap.end()) { - return NIXL_ERR_NOT_FOUND; - } - - nixlUcxMoConnection &conn = it->second; - - for (auto &e : engines) { - for (uint32_t idx = 0; idx < conn.num_engines; idx++) { - status = e->disconnect(getEngName(remote_agent, idx)); - if (status != NIXL_SUCCESS) { - return status; - } - } - } - - remoteConnMap.erase(remote_agent); - - return NIXL_SUCCESS; -} - -/**************************************** - * Memory management -*****************************************/ - - -nixl_status_t -nixlUcxMoEngine::registerMem (const nixlBlobDesc &mem, - const nixl_mem_t &nixl_mem, - nixlBackendMD* &out) -{ - auto priv = std::make_unique(); - int32_t eidx = getEngIdx(nixl_mem, mem.devId); - nixlSerDes sd; - string str; - nixl_status_t status; - - if (eidx < 0) { - return NIXL_ERR_INVALID_PARAM; - } - - priv->memType = nixl_mem; - priv->eidx = eidx; - status = engines[eidx]->registerMem(mem, nixl_mem, priv->md); - if (NIXL_SUCCESS != status) { - return status; - } - - sd.addBuf("EngIdx", &eidx, sizeof(eidx)); - status = engines[eidx]->getPublicData(priv->md, str); - if (NIXL_SUCCESS != status) { - return status; - } - sd.addStr("RkeyStr", str); - priv->rkeyStr = sd.exportStr(); - out = priv.release(); - - return NIXL_SUCCESS; -} - -nixl_status_t -nixlUcxMoEngine::getPublicData (const nixlBackendMD* meta, - std::string &str) const -{ - const nixlUcxMoPrivateMetadata *priv = (nixlUcxMoPrivateMetadata*) meta; - str = priv->get(); - return NIXL_SUCCESS; -} - -nixl_status_t -nixlUcxMoEngine::deregisterMem (nixlBackendMD* meta) -{ - nixlUcxMoPrivateMetadata *priv = (nixlUcxMoPrivateMetadata*) meta; - - engines[priv->eidx]->deregisterMem(priv->md); - delete priv; - return NIXL_SUCCESS; -} - -// To be cleaned up -nixl_status_t -nixlUcxMoEngine::internalMDHelper (const nixl_blob_t &blob, - const nixl_mem_t &nixl_mem, - const std::string &agent, - nixlBackendMD* &output) -{ - nixlUcxMoConnection conn; - nixlSerDes sd; - nixl_blob_t ucx_blob; - nixl_status_t status; - nixlBlobDesc input_int; - - auto md = std::make_unique(); - - auto search = remoteConnMap.find(agent); - - if(search == remoteConnMap.end()) { - //TODO: err: remote connection not found - return NIXL_ERR_NOT_FOUND; - } - conn = search->second; - - status = sd.importStr(blob); - - ssize_t ret = sd.getBufLen("EngIdx"); - if (ret != sizeof(md->eidx)) { - return NIXL_ERR_MISMATCH; - } - - status = sd.getBuf("EngIdx", &md->eidx, ret); - if (status != NIXL_SUCCESS) { - return status; - } - - ucx_blob = sd.getStr("RkeyStr"); - if (status != NIXL_SUCCESS) { - return status; - } - - for (auto &e : engines) { - nixlBackendMD *int_md; - input_int.metaInfo = ucx_blob; - status = e->loadRemoteMD(input_int, nixl_mem, - getEngName(agent, md->eidx), - int_md); - if (status != NIXL_SUCCESS) { - return status; - } - md->int_mds.push_back(int_md); - } - - output = md.release(); - return NIXL_SUCCESS; -} - -nixl_status_t -nixlUcxMoEngine::loadLocalMD(nixlBackendMD* input, - nixlBackendMD* &output) -{ - nixlUcxMoPrivateMetadata* input_md = (nixlUcxMoPrivateMetadata*) input; - return internalMDHelper(input_md->rkeyStr, input_md->memType, localAgent, output); -} - -nixl_status_t -nixlUcxMoEngine::loadRemoteMD (const nixlBlobDesc &input, - const nixl_mem_t &nixl_mem, - const string &remote_agent, - nixlBackendMD* &output) -{ - return internalMDHelper(input.metaInfo, nixl_mem, remote_agent, output); -} - -nixl_status_t -nixlUcxMoEngine::unloadMD (nixlBackendMD* input) -{ - nixl_status_t status; - - nixlUcxMoPublicMetadata *md = (nixlUcxMoPublicMetadata *)input; - for (size_t i = 0; i < md->int_mds.size(); i++) { - status = engines[i]->unloadMD(md->int_mds[i]); - if (NIXL_SUCCESS != status) { - return status; - } - } - return NIXL_SUCCESS; -} - -/**************************************** - * Data movement -*****************************************/ - -nixl_status_t -nixlUcxMoEngine::prepXfer (const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - nixlBackendReqH* &handle, - const nixl_opt_b_args_t *opt_args) const -{ - size_t lidx, ridx; - size_t lidx_max, ridx_max; - - // Number of local and remote descriptors must match - int des_cnt = local.descCount(); - if (des_cnt != remote.descCount()) { - return NIXL_ERR_INVALID_PARAM; - } - - // Check operation type - switch(operation) { - case NIXL_READ: - case NIXL_WRITE: - break; - default: - return NIXL_ERR_INVALID_PARAM; - } - - // Check that remote agent is known - const auto it = remoteConnMap.find(remote_agent); - if(it == remoteConnMap.end()) { - return NIXL_ERR_INVALID_PARAM; - } - const nixlUcxMoConnection &conn = it->second; - - /* Allocate request and fill communication distribution matrix */ - size_t l_eng_cnt = engines.size(); - size_t r_eng_cnt = conn.num_engines; - - auto req = std::make_unique(l_eng_cnt, r_eng_cnt); - - /* Go over all input */ - for(int i = 0; i < des_cnt; i++) { - size_t lsize = local[i].len; - size_t rsize = remote[i].len; - nixlUcxMoPrivateMetadata *lmd; - lmd = (nixlUcxMoPrivateMetadata *)local[i].metadataP; - nixlUcxMoPublicMetadata *rmd; - rmd = (nixlUcxMoPublicMetadata *)remote[i].metadataP; - size_t lidx = lmd->eidx; - size_t ridx = rmd->eidx; - - if (!((lidx < l_eng_cnt) && (ridx < r_eng_cnt))) { - // TODO: err output - goto err_clean_req; - } - if (lsize != rsize) { - // TODO: err output - goto err_clean_req; - } - - /* Allocate internal dlists if needed */ - if (!req->dlMatrix[lidx][ridx].in_use) { - req->dlMatrix[lidx][ridx].in_use = true; - req->dlMatrix[lidx][ridx].ldescs = new nixl_meta_dlist_t(local.getType()); - - req->dlMatrix[lidx][ridx].rdescs = new nixl_meta_dlist_t(remote.getType()); - } - - nixlMetaDesc ldesc = local[i]; - ldesc.metadataP = lmd->md; - req->dlMatrix[lidx][ridx].ldescs->addDesc(ldesc); - - nixlMetaDesc rdesc = remote[i]; - rdesc.metadataP = rmd->int_mds[lidx]; - req->dlMatrix[lidx][ridx].rdescs->addDesc(rdesc); - } - - // Prepare UCX requests! - for(lidx = 0; lidx < req->dlMatrix.size(); lidx++) { - for(ridx = 0; ridx < req->dlMatrix[lidx].size(); ridx++) { - nixl_status_t ret; - - if (!req->dlMatrix[lidx][ridx].in_use) { - // Skip unused matrix elements - continue; - } - ret = engines[lidx]->prepXfer(operation, - *req->dlMatrix[lidx][ridx].ldescs, - *req->dlMatrix[lidx][ridx].rdescs, - getEngName(remote_agent, ridx), - req->dlMatrix[lidx][ridx].ucx_req); - if (NIXL_SUCCESS != ret) { - goto err_clean_sub_req; - } - } - } - - handle = req.release(); - - return NIXL_SUCCESS; - -err_clean_sub_req: - /* Release only allocated requests */ - lidx_max = lidx + 1; - ridx_max = ridx; - for(lidx = 0; lidx < lidx_max; lidx++) { - for(ridx = 0; ridx < ridx_max; ridx++) { - nixl_status_t ret; - - if (!req->dlMatrix[lidx][ridx].in_use) { - // Skip unused matrix elements - continue; - } - - engines[lidx]->releaseReqH(req->dlMatrix[lidx][ridx].ucx_req); - if (NIXL_SUCCESS != ret) { - // TODO: Output error, but still continue trying to fix others - } - } - } - -err_clean_req: - return NIXL_ERR_INVALID_PARAM; -} - - -// Data transfer -nixl_status_t -nixlUcxMoEngine::postXfer (const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - nixlBackendReqH* &handle, - const nixl_opt_b_args_t *opt_args) const -{ - nixlUcxMoRequestH *req = (nixlUcxMoRequestH *)handle; - bool in_progress = false; - - for(size_t lidx = 0; lidx < req->dlMatrix.size(); lidx++) { - for(size_t ridx = 0; ridx < req->dlMatrix[lidx].size(); ridx++) { - nixl_status_t ret; - - if (!req->dlMatrix[lidx][ridx].in_use) { - // Skip unused matrix elements - continue; - } - - ret = engines[lidx]->postXfer(operation, - *req->dlMatrix[lidx][ridx].ldescs, - *req->dlMatrix[lidx][ridx].rdescs, - getEngName(remote_agent, ridx), - req->dlMatrix[lidx][ridx].ucx_req); - - /* if transfer wasn't immediately completed */ - switch(ret) { - case NIXL_IN_PROG: - req->dlMatrix[lidx][ridx].in_progress = true; - in_progress = true; - case NIXL_SUCCESS: - // Nothing to do - break; - default: - // Error. - return ret; - } - } - } - - if (in_progress) { - // The transfers are performed via parallel UCX workers (meaning QPs). - // This doesn't allow piggybacking the notification in postXfer. We - // can only send it after all workers are flushed, in checkXfer(). - if (opt_args->hasNotif) { - req->notifNeed = true; - req->notifMsg = opt_args->notifMsg; - req->remoteAgent = remote_agent; - } - - return NIXL_IN_PROG; - } - - if (opt_args->hasNotif) { - auto ret = engines[0]->genNotif(getEngName(remote_agent, 0), - opt_args->notifMsg); - if (NIXL_SUCCESS != ret) { - /* Return error, TODO: add output */ - return ret; - } - } - - return NIXL_SUCCESS; -} - -nixl_status_t -nixlUcxMoEngine::checkXfer (nixlBackendReqH *handle) const -{ - nixlUcxMoRequestH *req = (nixlUcxMoRequestH *)handle; - nixl_status_t out_ret = NIXL_SUCCESS; - - for(size_t lidx = 0; lidx < req->dlMatrix.size(); lidx++) { - for(size_t ridx = 0; ridx < req->dlMatrix[lidx].size(); ridx++) { - nixl_status_t ret; - - if (!req->dlMatrix[lidx][ridx].in_progress) { - // Skip not-in-progress matrix elements - continue; - } - - ret = engines[lidx]->checkXfer(req->dlMatrix[lidx][ridx].ucx_req); - switch (ret) { - case NIXL_SUCCESS: - /* Mark as completed */ - req->dlMatrix[lidx][ridx].in_progress = false; - break; - case NIXL_IN_PROG: - out_ret = NIXL_IN_PROG; - break; - default: - /* Any other ret value is unexpected */ - return ret; - } - } - } - - if ((NIXL_SUCCESS == out_ret) && req->notifNeed) { - nixl_status_t ret; - - // Now as all UCX backends (workers) have been flushed, - // it is safe to send Notification - ret = engines[0]->genNotif(getEngName(req->remoteAgent, 0), req->notifMsg); - if (NIXL_SUCCESS != ret) { - /* Return error, TODO: add output */ - return ret; - } - } - - return out_ret; -} - -nixl_status_t -nixlUcxMoEngine::releaseReqH(nixlBackendReqH* handle) const -{ - nixlUcxMoRequestH *req = (nixlUcxMoRequestH *)handle; - nixl_status_t out_ret = NIXL_SUCCESS; - - for(size_t lidx = 0; lidx < req->dlMatrix.size(); lidx++) { - for(size_t ridx = 0; ridx < req->dlMatrix[lidx].size(); ridx++) { - nixl_status_t ret; - - if (!req->dlMatrix[lidx][ridx].in_use) { - // Skip unused matrix elements - continue; - } - - ret = engines[lidx]->releaseReqH(req->dlMatrix[lidx][ridx].ucx_req); - if (NIXL_SUCCESS != ret) { - // TODO: Output error, but still continue trying to fix others - out_ret = ret; - } - } - } - - return out_ret; -} - -int -nixlUcxMoEngine::progress() -{ - int ret = 0; - // Iterate over all elements cancelling each one - for ( auto &e : engines ) { - ret += e->progress(); - } - return ret; -} - -nixl_status_t -nixlUcxMoEngine::getNotifs(notif_list_t ¬if_list) -{ - return engines[0]->getNotifs(notif_list); -} - -nixl_status_t -nixlUcxMoEngine::genNotif(const string &remote_agent, const string &msg) const -{ - return engines[0]->genNotif(getEngName(remote_agent, 0), msg); -} diff --git a/src/plugins/ucx_mo/ucx_mo_backend.h b/src/plugins/ucx_mo/ucx_mo_backend.h deleted file mode 100644 index 9fef3a251..000000000 --- a/src/plugins/ucx_mo/ucx_mo_backend.h +++ /dev/null @@ -1,173 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __UCX_MO_BACKEND_H -#define __UCX_MO_BACKEND_H - -#include -#include -#include -#include -#include -#include -#include - -#include "nixl.h" -#include "ucx_backend.h" - -// Local includes -#include -#include - -class nixlUcxMoConnection : public nixlBackendConnMD { - private: - std::string remoteAgent; - uint32_t num_engines; - - public: - // Extra information required for UCX connections - - friend class nixlUcxMoEngine; -}; - -// A private metadata has to implement get, and has all the metadata -class nixlUcxMoPrivateMetadata : public nixlBackendMD -{ -private: - uint32_t eidx; - nixlBackendMD *md; - nixl_mem_t memType; - nixl_blob_t rkeyStr; -public: - nixlUcxMoPrivateMetadata() : nixlBackendMD(true) { - } - - ~nixlUcxMoPrivateMetadata(){ - } - - std::string get() const { - return rkeyStr; - } - - friend class nixlUcxMoEngine; -}; - -// A public metadata has to implement put, and only has the remote metadata -class nixlUcxMoPublicMetadata : public nixlBackendMD -{ - uint32_t eidx; - nixlUcxMoConnection conn; - std::vector int_mds; - -public: - nixlUcxMoPublicMetadata() : nixlBackendMD(false) {} - - ~nixlUcxMoPublicMetadata(){ - } - - friend class nixlUcxMoEngine; -}; - - - -class nixlUcxMoEngine : public nixlBackendEngine { -private: - uint32_t _engineCnt; - uint32_t _gpuCnt; - int setEngCnt(uint32_t host_engines); - uint32_t getEngCnt(); - int32_t getEngIdx(nixl_mem_t type, uint64_t devId); - std::string getEngName(const std::string &baseName, uint32_t eidx) const; - std::string getEngBase(const std::string &engName); - bool pthrOn; - - // UCX backends data - std::vector> engines; - // Map of agent name to saved nixlUcxConnection info - using remote_conn_map_t = std::map; - using remote_comm_it_t = remote_conn_map_t::iterator; - remote_conn_map_t remoteConnMap; - - // Memory helper - nixl_status_t internalMDHelper (const nixl_blob_t &blob, - const nixl_mem_t &nixl_mem, - const std::string &agent, - nixlBackendMD* &output); - -public: - nixlUcxMoEngine(const nixlBackendInitParams* init_params); - ~nixlUcxMoEngine() = default; - - bool supportsRemote () const { return true; } - bool supportsLocal () const { return false; } - bool supportsNotif () const { return true; } - - nixl_mem_list_t getSupportedMems () const; - - /* Object management */ - nixl_status_t getPublicData (const nixlBackendMD* meta, - std::string &str) const; - nixl_status_t getConnInfo(std::string &str) const; - nixl_status_t loadRemoteConnInfo (const std::string &remote_agent, - const std::string &remote_conn_info); - - nixl_status_t connect(const std::string &remote_agent); - nixl_status_t disconnect(const std::string &remote_agent); - - nixl_status_t registerMem (const nixlBlobDesc &mem, - const nixl_mem_t &nixl_mem, - nixlBackendMD* &out); - nixl_status_t deregisterMem (nixlBackendMD* meta); - - nixl_status_t loadLocalMD (nixlBackendMD* input, - nixlBackendMD* &output); - - nixl_status_t loadRemoteMD (const nixlBlobDesc &input, - const nixl_mem_t &nixl_mem, - const std::string &remote_agent, - nixlBackendMD* &output); - nixl_status_t unloadMD (nixlBackendMD* input); - - // Data transfer - nixl_status_t prepXfer (const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - nixlBackendReqH* &handle, - const nixl_opt_b_args_t* opt_args=nullptr) const; - - nixl_status_t postXfer (const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - nixlBackendReqH* &handle, - const nixl_opt_b_args_t* opt_args=nullptr) const; - nixl_status_t checkXfer (nixlBackendReqH* handle) const; - nixl_status_t releaseReqH(nixlBackendReqH* handle) const; - - nixl_status_t getNotifs(notif_list_t ¬if_list); - nixl_status_t genNotif(const std::string &remote_agent, const std::string &msg) const; - - //public function for UCX worker to mark connections as connected - nixl_status_t checkConn(const std::string &remote_agent); - nixl_status_t endConn(const std::string &remote_agent); - - // Public function as it is used in tests - int - progress(); -}; - -#endif diff --git a/src/plugins/ucx_mo/ucx_mo_plugin.cpp b/src/plugins/ucx_mo/ucx_mo_plugin.cpp deleted file mode 100644 index c5442d5fd..000000000 --- a/src/plugins/ucx_mo/ucx_mo_plugin.cpp +++ /dev/null @@ -1,49 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "backend/backend_plugin.h" -#include "ucx_mo_backend.h" -#include "ucx_utils.h" - -namespace { -nixl_b_params_t -get_ucx_mo_options() { - nixl_b_params_t params = get_ucx_backend_common_options(); - params["num_ucx_engines"] = "8"; - return params; -} -} // namespace - -// Plugin type alias for convenience -using ucx_mo_plugin_t = nixlBackendPluginCreator; - -#ifdef STATIC_PLUGIN_UCX_MO -nixlBackendPlugin * -createStaticUCX_MOPlugin() { - return ucx_mo_plugin_t::create( - NIXL_PLUGIN_API_VERSION, "UCX_MO", "0.1.0", get_ucx_mo_options(), {DRAM_SEG, VRAM_SEG}); -} -#else -extern "C" NIXL_PLUGIN_EXPORT nixlBackendPlugin * -nixl_plugin_init() { - return ucx_mo_plugin_t::create( - NIXL_PLUGIN_API_VERSION, "UCX_MO", "0.1.0", get_ucx_mo_options(), {DRAM_SEG, VRAM_SEG}); -} - -extern "C" NIXL_PLUGIN_EXPORT void -nixl_plugin_fini() {} -#endif diff --git a/test/gtest/error_handling.cpp b/test/gtest/error_handling.cpp index 2d3754287..611c8df81 100644 --- a/test/gtest/error_handling.cpp +++ b/test/gtest/error_handling.cpp @@ -423,9 +423,6 @@ TEST_P(TestErrorHandling, XferPostThenFail) { } INSTANTIATE_TEST_SUITE_P(ucx, TestErrorHandling, testing::Values(std::make_tuple("UCX", 1, 0))); -INSTANTIATE_TEST_SUITE_P(ucx_mo, - TestErrorHandling, - testing::Values(std::make_tuple("UCX_MO", 1, 0))); INSTANTIATE_TEST_SUITE_P(ucx_threadpool, TestErrorHandling, testing::Values(std::make_tuple("UCX", 2, 1))); diff --git a/test/gtest/plugin_manager.cpp b/test/gtest/plugin_manager.cpp index 8423154c5..8dd274173 100644 --- a/test/gtest/plugin_manager.cpp +++ b/test/gtest/plugin_manager.cpp @@ -38,8 +38,6 @@ const PluginDesc ucx_plugin_desc{.name = "UCX", .type = PluginDesc::PluginType::Real}; const PluginDesc gds_plugin_desc{.name = "GDS", .type = PluginDesc::PluginType::Real}; -const PluginDesc ucx_mo_plugin_desc{ - .name = "UCX_MO", .type = PluginDesc::PluginType::Real}; class LoadSinglePluginTestFixture : public testing::TestWithParam { @@ -191,30 +189,11 @@ INSTANTIATE_TEST_SUITE_P(UcxLoadPluginInstantiation, INSTANTIATE_TEST_SUITE_P(GdsLoadPluginInstantiation, LoadSinglePluginTestFixture, testing::Values(gds_plugin_desc)); -INSTANTIATE_TEST_SUITE_P(UcxMoLoadPluginInstantiation, - LoadSinglePluginTestFixture, - testing::Values(ucx_mo_plugin_desc)); /* Load multiple plugins tests instantiations. */ INSTANTIATE_TEST_SUITE_P(UcxGdsLoadMultiplePluginInstantiation, LoadMultiplePluginsTestFixture, testing::Values(std::vector{ ucx_plugin_desc, gds_plugin_desc})); -INSTANTIATE_TEST_SUITE_P(UcxUcxMoLoadMultiplePluginInstantiation, - LoadMultiplePluginsTestFixture, - testing::Values(std::vector{ - ucx_plugin_desc, - ucx_mo_plugin_desc})); -INSTANTIATE_TEST_SUITE_P(GdsUcxMoLoadMultiplePluginInstantiation, - LoadMultiplePluginsTestFixture, - testing::Values(std::vector{ - gds_plugin_desc, - ucx_mo_plugin_desc})); -INSTANTIATE_TEST_SUITE_P(UcxGdsUcxMoLoadMultiplePluginInstantiation, - LoadMultiplePluginsTestFixture, - testing::Values(std::vector{ - ucx_plugin_desc, gds_plugin_desc, - ucx_mo_plugin_desc})); - } // namespace plugin_manager } // namespace gtest diff --git a/test/gtest/test_transfer.cpp b/test/gtest/test_transfer.cpp index 73abbdd42..c9a3668c0 100644 --- a/test/gtest/test_transfer.cpp +++ b/test/gtest/test_transfer.cpp @@ -121,7 +121,7 @@ class TestTransfer : { nixl_b_params_t params; - if (getBackendName() == "UCX" || getBackendName() == "UCX_MO") { + if (getBackendName() == "UCX") { params["num_workers"] = std::to_string(getNumWorkers()); params["num_threads"] = std::to_string(getNumThreads()); params["split_batch_size"] = "32"; @@ -540,11 +540,6 @@ TEST_P(TestTransfer, NotificationOnly) { } TEST_P(TestTransfer, SelfNotification) { - // UCX_MO does not support local communication - if (getBackendName() == "UCX_MO") { - GTEST_SKIP() << "UCX_MO does not support local communication"; - } - constexpr size_t repeat = 100; constexpr size_t num_threads = 4; doNotificationTest( @@ -733,8 +728,4 @@ INSTANTIATE_TEST_SUITE_P(ucx_threadpool, INSTANTIATE_TEST_SUITE_P(ucx_threadpool_no_pt, TestTransfer, testing::Values(std::make_tuple("UCX", false, 6, 4))); -INSTANTIATE_TEST_SUITE_P(ucx_mo, - TestTransfer, - testing::Values(std::make_tuple("UCX_MO", true, 2, 0))); - } // namespace gtest diff --git a/test/nixl/test_plugin.cpp b/test/nixl/test_plugin.cpp index c644db7a3..269b36df8 100644 --- a/test/nixl/test_plugin.cpp +++ b/test/nixl/test_plugin.cpp @@ -68,7 +68,6 @@ int main(int argc, char** argv) { std::set plugins = {"UCX", "GDS", "POSIX", - "UCX_MO", "MOCK_BACKEND", "GPUNETIO", "OBJ", diff --git a/test/unit/plugins/meson.build b/test/unit/plugins/meson.build index d1e554616..3b1f3e2bf 100644 --- a/test/unit/plugins/meson.build +++ b/test/unit/plugins/meson.build @@ -15,7 +15,6 @@ if ucx_dep.found() subdir('ucx') - subdir('ucx_mo') endif subdir('posix') diff --git a/test/unit/plugins/ucx_mo/meson.build b/test/unit/plugins/ucx_mo/meson.build deleted file mode 100644 index 207c4e69a..000000000 --- a/test/unit/plugins/ucx_mo/meson.build +++ /dev/null @@ -1,32 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -ucx_backend_dep = declare_dependency(link_with: ucx_backend_lib, include_directories: [nixl_inc_dirs, '../../../../src/plugins/ucx']) -ucx_mo_backend_dep = declare_dependency(link_with: ucx_mo_backend_lib, include_directories: [nixl_inc_dirs, '../../../../src/plugins/ucx_mo']) - -if cuda_dep.found() - cuda_dependencies = [cuda_dep] - cpp_args = '-DHAVE_CUDA' -else - cuda_dependencies = [] - cpp_args = '-UHAVE_CUDA' -endif - -ucx_backend_test = executable('ucx_mo_backend_test', - 'ucx_mo_backend_test.cpp', - dependencies: [nixl_dep, nixl_infra, nixl_common_deps, ucx_backend_dep, ucx_mo_backend_dep, ucx_dep] + cuda_dependencies + nixl_test_utils_dep, - include_directories: [nixl_inc_dirs, utils_inc_dirs], - cpp_args : cpp_args, - install: true) diff --git a/test/unit/plugins/ucx_mo/ucx_mo_backend_test.cpp b/test/unit/plugins/ucx_mo/ucx_mo_backend_test.cpp deleted file mode 100644 index d2098251d..000000000 --- a/test/unit/plugins/ucx_mo/ucx_mo_backend_test.cpp +++ /dev/null @@ -1,627 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include -#include - -#include "ucx_mo_backend.h" -#include "test_utils.h" - -using namespace std; - - -#ifdef HAVE_CUDA - -#include -#include - -int gpu_id = 0; - -static void checkCudaError(cudaError_t result, const char *message) { - if (result != cudaSuccess) { - nixl_exit_on_failure(result, - std::string(message) + " (Error code: " + std::to_string(result) + - " - " + cudaGetErrorString(result) + ")"); - } -} -#endif - - -static string op2string(nixl_xfer_op_t op, bool hasNotif) -{ - if(op == NIXL_READ && !hasNotif) - return string("READ"); - if(op == NIXL_WRITE && !hasNotif) - return string("WRITE"); - if(op == NIXL_READ && hasNotif) - return string("READ/NOTIF"); - if(op == NIXL_WRITE && hasNotif) - return string("WRITE/NOTIF"); - - return string("ERR-OP"); -} - -std::string memType2Str(nixl_mem_t mem_type) -{ - switch(mem_type) { - case DRAM_SEG: - return std::string("DRAM"); - case VRAM_SEG: - return std::string("VRAM"); - case BLK_SEG: - return std::string("BLOCK"); - case FILE_SEG: - return std::string("FILE"); - default: - nixl_exit_on_failure(false, "Unsupported memory type!"); - } - return std::string(""); -} - -nixlBackendEngine *createEngine(std::string name, uint32_t ndev, bool p_thread) -{ - nixlBackendEngine *ucx_mo; - nixlBackendInitParams init; - nixl_b_params_t custom_params; - - custom_params["num_ucx_engines"] = std::to_string(ndev); - init.enableProgTh = p_thread; - init.pthrDelay = 100; - init.localAgent = name; - init.customParams = &custom_params; - init.type = "UCX_MO"; - - ucx_mo = (nixlBackendEngine *)new nixlUcxMoEngine(&init); - nixl_exit_on_failure(!ucx_mo->getInitErr(), "Failed to initialize worker1"); - if (ucx_mo->getInitErr()) { - std::cout << "Failed to initialize worker1" << std::endl; - exit(1); - } - - return ucx_mo; -} - -void releaseEngine(nixlBackendEngine *ucx) -{ - //protected now, should not call - delete ucx; -} - -#ifdef HAVE_CUDA - -static int cudaQueryAddr(void *address, bool &is_dev, - CUdevice &dev, CUcontext &ctx) -{ - CUmemorytype mem_type = CU_MEMORYTYPE_HOST; - uint32_t is_managed = 0; -#define NUM_ATTRS 4 - CUpointer_attribute attr_type[NUM_ATTRS]; - void *attr_data[NUM_ATTRS]; - CUresult result; - - attr_type[0] = CU_POINTER_ATTRIBUTE_MEMORY_TYPE; - attr_data[0] = &mem_type; - attr_type[1] = CU_POINTER_ATTRIBUTE_IS_MANAGED; - attr_data[1] = &is_managed; - attr_type[2] = CU_POINTER_ATTRIBUTE_DEVICE_ORDINAL; - attr_data[2] = &dev; - attr_type[3] = CU_POINTER_ATTRIBUTE_CONTEXT; - attr_data[3] = &ctx; - - result = cuPointerGetAttributes(4, attr_type, attr_data, (CUdeviceptr)address); - - is_dev = (mem_type == CU_MEMORYTYPE_DEVICE); - - return (CUDA_SUCCESS != result); -} - -#endif - - -void allocateBuffer(nixl_mem_t mem_type, int dev_id, size_t len, void* &addr) -{ - int ret; - switch(mem_type) { - case DRAM_SEG: - //addr = calloc(1, len); - ret = posix_memalign(&addr, 4096, len); - nixl_exit_on_failure((ret == 0), "Failed to allocate mem aligned buffer"); - break; -#ifdef HAVE_CUDA - case VRAM_SEG: { - bool is_dev; - CUdevice dev; - CUcontext ctx; - - checkCudaError(cudaSetDevice(dev_id), "Failed to set device"); - checkCudaError(cudaMalloc(&addr, len), "Failed to allocate CUDA buffer 0"); - cudaQueryAddr(addr, is_dev, dev, ctx); - std::cout << "CUDA addr: " << std::hex << addr << " dev=" << std::dec << dev - << " ctx=" << std::hex << ctx << std::dec << std::endl; - break; - } -#endif - default: - nixl_exit_on_failure(false, "Unsupported memory type!"); - } - nixl_exit_on_failure(addr != nullptr, "Failed to allocate buffer"); -} - -void releaseBuffer(nixl_mem_t mem_type, int dev_id, void* &addr) -{ - switch(mem_type) { - case DRAM_SEG: - free(addr); - break; -#ifdef HAVE_CUDA - case VRAM_SEG: - checkCudaError(cudaSetDevice(dev_id), "Failed to set device"); - checkCudaError(cudaFree(addr), "Failed to allocate CUDA buffer 0"); - break; -#endif - default: - nixl_exit_on_failure(false, "Unsupported memory type!"); - } -} - -void doMemset(nixl_mem_t mem_type, int dev_id, void *addr, char byte, size_t len) -{ - switch(mem_type) { - case DRAM_SEG: - memset(addr, byte, len); - break; -#ifdef HAVE_CUDA - case VRAM_SEG: - checkCudaError(cudaSetDevice(dev_id), "Failed to set device"); - checkCudaError(cudaMemset(addr, byte, len), "Failed to memset"); - break; -#endif - default: - nixl_exit_on_failure(1, "Unsupported memory type!"); - } -} - -void *getValidationPtr(nixl_mem_t mem_type, void *addr, size_t len) -{ - switch(mem_type) { - case DRAM_SEG: - return addr; - break; -#ifdef HAVE_CUDA - case VRAM_SEG: { - void *ptr = calloc(len, 1); - checkCudaError(cudaMemcpy(ptr, addr, len, cudaMemcpyDeviceToHost), "Failed to memcpy"); - return ptr; - } -#endif - default: - nixl_exit_on_failure(false, "Unsupported memory type!"); - } - return nullptr; -} - -void *releaseValidationPtr(nixl_mem_t mem_type, void *addr) -{ - switch(mem_type) { - case DRAM_SEG: - break; -#ifdef HAVE_CUDA - case VRAM_SEG: - free(addr); - break; -#endif - default: - nixl_exit_on_failure(false, "Unsupported memory type!"); - } - return nullptr; -} - -typedef int dev_distr_t(int idx, int max_idx, int cnt); - -int dev_distr_rr(int idx, int max_idx, int cnt) -{ - return idx % cnt; -} - -int dev_distr_blk(int idx, int max_idx, int cnt) -{ - int block_size = max_idx / cnt; - int nblocks_plus_1 = max_idx % cnt; - if (idx < (block_size+1) * nblocks_plus_1) { - return idx / (block_size+1); - } else { - return nblocks_plus_1 + (idx - nblocks_plus_1 * (block_size+1)) / block_size; - } -} - - -void createLocalDescs(nixlBackendEngine *ucx, nixl_meta_dlist_t &descs, - int dev_cnt, dev_distr_t dist_f, - int desc_cnt, size_t desc_size) -{ - - for(int i = 0; i < desc_cnt; i++) { - nixlBasicDesc desc; - nixlMetaDesc desc_m; - nixlBlobDesc desc_s; - void *addr; - - desc.len = desc_size; - desc.devId = dist_f(i, desc_cnt, dev_cnt); - - allocateBuffer(descs.getType(), desc.devId, desc.len, addr); - desc.addr = (uintptr_t)addr; - *((nixlBasicDesc*)&desc_s) = desc; - *((nixlBasicDesc*)&desc_m) = desc; - int ret = ucx->registerMem(desc_s, descs.getType(), desc_m.metadataP); - nixl_exit_on_failure((ret == NIXL_SUCCESS), "Failed to register ucx memory"); - descs.addDesc(desc_m); - } -} - - -void destroyLocalDescs(nixlBackendEngine *ucx, nixl_meta_dlist_t &descs) -{ - for(int i = 0; i < descs.descCount(); i++) { - auto dev_id = descs[i].devId; - void *addr = (void*)descs[i].addr; - ucx->deregisterMem(descs[i].metadataP); - releaseBuffer(descs.getType(), dev_id, addr); - } - - while(descs.descCount()) { - descs.remDesc(0); - } -} - -void createRemoteDescs(nixlBackendEngine *src_ucx, - std::string agent, - nixl_meta_dlist_t &src_descs, - nixlBackendEngine *dst_ucx, - nixl_meta_dlist_t &dst_descs) -{ - bool is_local = (src_ucx == dst_ucx); - - for(int i = 0; i < src_descs.descCount(); i++) { - nixlBlobDesc desc_s; - nixlMetaDesc desc_m; - nixl_status_t status; - - *((nixlBasicDesc*)&desc_s) = (nixlBasicDesc)src_descs[i]; - *((nixlBasicDesc*)&desc_m) = (nixlBasicDesc)src_descs[i]; - - if (is_local) { - status = dst_ucx->loadLocalMD(src_descs[i].metadataP, desc_m.metadataP); - } else { - status = src_ucx->getPublicData(src_descs[i].metadataP, desc_s.metaInfo); - nixl_exit_on_failure(status, "Failed to get src_ucx public data"); - status = dst_ucx->loadRemoteMD (desc_s, src_descs.getType(), - agent, desc_m.metadataP); - } - nixl_exit_on_failure(status, "Failed to load dst_ucx remote MD"); - dst_descs.addDesc(desc_m); - } -} - -void destroyRemoteDescs(nixlBackendEngine *dst_ucx, - nixl_meta_dlist_t &dst_descs) -{ - nixl_status_t status; - for(int i = 0; i < dst_descs.descCount(); i++) { - status = dst_ucx->unloadMD(dst_descs[i].metadataP); - nixl_exit_on_failure(status, "Failed to unload dst_ucx MD"); - } - - while(dst_descs.descCount()) { - dst_descs.remDesc(0); - } -} - -void performTransfer(nixlBackendEngine *ucx1, nixlBackendEngine *ucx2, - nixl_meta_dlist_t &req_src_descs, - nixl_meta_dlist_t &req_dst_descs, - nixl_xfer_op_t op, bool progress, bool use_notif) -{ - nixl_status_t status; - nixlBackendReqH* handle; - void *chkptr1, *chkptr2; - - std::string remote_agent ("Agent2"); - - if(ucx1 == ucx2) remote_agent = "Agent1"; - - std::string test_str("test"); - std::cout << "\t" << op2string(op, use_notif) << "\n"; - nixl_opt_b_args_t opt_args; - opt_args.notifMsg = test_str; - opt_args.hasNotif = use_notif; - - // Posting a request, to be updated to return an async handler, - // or an ID that later can be used to check the status as a new method - // Also maybe we would remove the WRITE and let the backend class decide the op - status = ucx1->prepXfer(op, req_src_descs, req_dst_descs, remote_agent, handle, &opt_args); - nixl_exit_on_failure(status, "Failed to prep ucx1 xfer"); - status = ucx1->postXfer(op, req_src_descs, req_dst_descs, remote_agent, handle, &opt_args); - nixl_exit_on_failure((status >= NIXL_SUCCESS), "Failed to post ucx1 xfer"); - - - if (status == NIXL_SUCCESS) { - cout << "\t\tWARNING: Tansfer request completed immediately - no testing non-inline path" << endl; - } else { - cout << "\t\tNOTE: Testing non-inline Transfer path!" << endl; - - while(status == NIXL_IN_PROG) { - status = ucx1->checkXfer(handle); - if(progress){ - ((nixlUcxMoEngine *)ucx2)->progress(); - } - nixl_exit_on_failure((status >= NIXL_SUCCESS), "Failed to check ucx1 xfer"); - } - ucx1->releaseReqH(handle); - } - - if(use_notif) { - /* Test notification path */ - notif_list_t target_notifs; - - cout << "\t\tChecking notification flow: " << flush; - - while(!target_notifs.size()){ - status = ucx2->getNotifs(target_notifs); - nixl_exit_on_failure(status, "Failed to get ucx2 notifs"); - if(progress){ - ((nixlUcxMoEngine *)ucx1)->progress(); - } - } - - nixl_exit_on_failure((target_notifs.size() == 1), "Incorrect number of target notifs"); - nixl_exit_on_failure((target_notifs.front().first == "Agent1"), - "Incorrect front notif source"); - nixl_exit_on_failure((target_notifs.front().second == test_str), - "Incorrect front notif message"); - - cout << "OK" << endl; - } - - cout << "\t\tData verification: " << flush; - - nixl_exit_on_failure((req_src_descs.descCount() == req_dst_descs.descCount()), - "Data length mismatch"); - for(int i = 0; i < req_src_descs.descCount(); i++) { - auto sdesc = req_src_descs[i]; - auto ddesc = req_dst_descs[i]; - nixl_exit_on_failure((sdesc.len == ddesc.len), "Data length mismatch"); - size_t len = ddesc.len; - chkptr1 = getValidationPtr(req_src_descs.getType(), (void*)sdesc.addr, len); - chkptr2 = getValidationPtr(req_dst_descs.getType(), (void*)ddesc.addr, len); - - // Perform correctness check. - for (size_t i = 0; i < len; i++) { - nixl_exit_on_failure((((uint8_t *)chkptr1)[i] == ((uint8_t *)chkptr2)[i]), - "Data mismatch"); - } - - releaseValidationPtr(req_src_descs.getType(), chkptr1); - releaseValidationPtr(req_dst_descs.getType(), chkptr2); - } - cout << "OK" << endl << flush; -} - -void test_agent_transfer(bool p_thread, - nixlBackendEngine *ucx1, nixl_mem_t src_mem_type, int src_dev_cnt, dev_distr_t src_dist_f, - nixlBackendEngine *ucx2, nixl_mem_t dst_mem_type, int dst_dev_cnt, dev_distr_t dst_dist_f) -{ - int iter = 10; - nixl_status_t status; - bool is_local = (ucx1 == ucx2); - - if (is_local) { - // assert(ucx1->supportsLocal()); - } - - std::cout << std::endl << std::endl; - std::cout << "****************************************************" << std::endl; - std::cout << ((is_local) ? std::string("IntrA") : std::string("IntEr")) - << "-agent memory transfer test P-Thr=" - << (p_thread ? "ON" : "OFF") << std::endl; - std::cout << " (" << memType2Str(src_mem_type) << " -> " - << memType2Str(dst_mem_type) << ")" << std::endl; - std::cout << "****************************************************" << std::endl; - std::cout << std::endl << std::endl; - - // Example: assuming two agents running on the same machine, - // with separate memory regions in DRAM - std::string agent1("Agent1"); - std::string agent2("Agent2"); - std::string *agent = &agent2; - - // We get the required connection info from UCX to be put on the central - // location and ask for it for a remote node - std::string conn_info1; - status = ucx1->getConnInfo(conn_info1); - nixl_exit_on_failure(status, "Failed to get ucx1 conn info"); - std::string conn_info2; - status = ucx2->getConnInfo(conn_info2); - nixl_exit_on_failure(status, "Failed to get ucx2 conn info"); - // We assumed we put them to central location and now receiving it on the other process - if (is_local) { - agent = &agent1; - } - status = ucx1->loadRemoteConnInfo(*agent, conn_info2); - nixl_exit_on_failure(status, "Failed to load ucx1 remote conn info"); - // TODO: Causes race condition - investigate conn management implementation - // ret = ucx2->loadRemoteConnInfo (agent1, conn_info1); - - std::cout << "Synchronous handshake complete\n"; - - // Number of transfer descriptors - int desc_cnt = 128; - // Size of a single descriptor - size_t desc_size = 512 * 1024; - nixl_meta_dlist_t ucx1_src_descs (src_mem_type); - nixl_meta_dlist_t ucx2_src_descs (dst_mem_type); - nixl_meta_dlist_t ucx1_dst_descs (dst_mem_type); - - createLocalDescs(ucx1, ucx1_src_descs, src_dev_cnt, src_dist_f, - desc_cnt, desc_size); - createLocalDescs(ucx2, ucx2_src_descs, dst_dev_cnt, dst_dist_f, - desc_cnt, desc_size); - createRemoteDescs(ucx2, agent2, ucx2_src_descs, - ucx1, ucx1_dst_descs); - - - nixl_xfer_op_t ops[] = { NIXL_READ, NIXL_WRITE }; - bool use_notifs[] = { true, false }; - - for (size_t i = 0; i < sizeof(ops)/sizeof(ops[i]); i++) { - - for(bool use_notif : use_notifs) { - cout << endl << op2string(ops[i], use_notif) << " test (" << iter << ") iterations" <genNotif(*agent, test_str); - - cout << "\t\tChecking notification flow: " << flush; - - while(target_notifs.size() == 0){ - status = ucx2->getNotifs(target_notifs); - nixl_exit_on_failure(status, "Failed to get ucx2 notifs"); - if (!p_thread) { - /* progress UCX1 as well */ - ((nixlUcxMoEngine *)ucx1)->progress(); - } - } - - nixl_exit_on_failure((target_notifs.size() == 1), "Incorrect number of target notifs"); - nixl_exit_on_failure((target_notifs.front().first == "Agent1"), - "Incorrect front notif source"); - nixl_exit_on_failure((target_notifs.front().second == test_str), - "Incorrect front notif message"); - - cout << "OK" << endl; - } - - // As well as all the remote notes, asking to remove them one by one - // need to provide list of descs - destroyRemoteDescs(ucx1, ucx1_dst_descs); - - destroyLocalDescs(ucx1, ucx1_src_descs); - destroyLocalDescs(ucx2, ucx2_src_descs); - - // Test one-sided disconnect (initiator only) - ucx1->disconnect(*agent); - - // TODO: Causes race condition - investigate conn management implementation - //ucx2->disconnect(agent1); -} - -int main() -{ - bool thread_on[] = {false , true}; -#define THREAD_ON_SIZE (sizeof(thread_on) / sizeof(thread_on[0])) - nixlBackendEngine *ucx[THREAD_ON_SIZE][2] = { 0 }; - -#define NUM_WORKERS 8 - -int ndevices = NUM_WORKERS; -#ifdef HAVE_CUDA - int n_vram_dev; - if (cudaGetDeviceCount(&n_vram_dev) != cudaSuccess) { - std::cout << "Call to cudaGetDeviceCount failed, assuming 0 devices"; - n_vram_dev = 0; - } - std::cout << "Detected " << n_vram_dev << " CUDA devices" << std::endl; -#endif - - - // Allocate UCX engines - for(size_t i = 0; i < THREAD_ON_SIZE; i++) { - for(int j = 0; j < 2; j++) { - std::stringstream s; - s << "Agent" << (j + 1); - ucx[i][j] = createEngine(s.str(), ndevices, thread_on[i]); - } - } - - for(size_t i = 0; i < THREAD_ON_SIZE; i++) { - //Test local memory to local memory transfer - test_agent_transfer(thread_on[i], - ucx[i][0], DRAM_SEG, ndevices, dev_distr_rr, - ucx[i][0], DRAM_SEG, ndevices, dev_distr_blk); -#ifdef HAVE_CUDA - if (n_vram_dev) { - test_agent_transfer(thread_on[i], - ucx[i][0], VRAM_SEG, ndevices, dev_distr_rr, - ucx[i][0], VRAM_SEG, ndevices, dev_distr_blk); - } -#endif - } - - for(size_t i = 0; i < THREAD_ON_SIZE; i++) { - test_agent_transfer(thread_on[i], - ucx[i][0], DRAM_SEG, ndevices, dev_distr_rr, - ucx[i][1], DRAM_SEG, ndevices, dev_distr_blk); - -#ifdef HAVE_CUDA - if (n_vram_dev) { - test_agent_transfer(thread_on[i], - ucx[i][0], VRAM_SEG, n_vram_dev, dev_distr_rr, - ucx[i][1], VRAM_SEG, n_vram_dev, dev_distr_blk); - test_agent_transfer(thread_on[i], - ucx[i][0], VRAM_SEG, n_vram_dev, dev_distr_rr, - ucx[i][1], VRAM_SEG, n_vram_dev, dev_distr_blk); - test_agent_transfer(thread_on[i], - ucx[i][0], VRAM_SEG, n_vram_dev, dev_distr_rr, - ucx[i][1], DRAM_SEG, n_vram_dev, dev_distr_blk); - test_agent_transfer(thread_on[i], - ucx[i][0], DRAM_SEG, n_vram_dev, dev_distr_rr, - ucx[i][1], VRAM_SEG, n_vram_dev, dev_distr_blk); - } -#endif - } - - // Allocate UCX engines - for(int i = 0; i < 2; i++) { - for(int j = 0; j < 2; j++) { - releaseEngine(ucx[i][j]); - } - } -}