diff --git a/.ci/jenkins/lib/build-matrix.yaml b/.ci/jenkins/lib/build-matrix.yaml index 1ea9b3637..f5733486f 100644 --- a/.ci/jenkins/lib/build-matrix.yaml +++ b/.ci/jenkins/lib/build-matrix.yaml @@ -59,7 +59,7 @@ steps: parallel: false timeout: "${TEST_TIMEOUT}" run: | - .gitlab/test_cpp.sh ${NIXL_INSTALL_DIR} + for i in $(seq 1 30); do .gitlab/test_cpp.sh ${NIXL_INSTALL_DIR} ; done - name: Test Python parallel: false diff --git a/.ci/jenkins/lib/test-matrix.yaml b/.ci/jenkins/lib/test-matrix.yaml index 34836cb2f..935273e86 100644 --- a/.ci/jenkins/lib/test-matrix.yaml +++ b/.ci/jenkins/lib/test-matrix.yaml @@ -25,7 +25,7 @@ timeout_minutes: 240 # label is defined at jenkins slave configuration, we want to run the job on a gpu agent and be able to esaly replace it without having to change this file runs_on_agents: - {nodeLabel: 'H100'} - # - {nodeLabel: 'DGX'} + - {nodeLabel: 'DGX'} matrix: axes: diff --git a/src/core/nixl_listener.cpp b/src/core/nixl_listener.cpp index 16e3d9ca1..f5aaaabaa 100644 --- a/src/core/nixl_listener.cpp +++ b/src/core/nixl_listener.cpp @@ -35,10 +35,12 @@ namespace { static const std::string invalid_label = "invalid"; -int connectToIP(std::string ip_addr, int port) { +int +connectToIP(std::string ip_addr, int port) { - struct sockaddr_in listenerAddr; - listenerAddr.sin_port = htons(port); + struct sockaddr_in listenerAddr {}; + + listenerAddr.sin_port = htons(port); listenerAddr.sin_family = AF_INET; if (inet_pton(AF_INET, ip_addr.c_str(), &listenerAddr.sin_addr) <= 0) { diff --git a/src/infra/nixl_descriptors.cpp b/src/infra/nixl_descriptors.cpp index ecb005389..7ad979952 100644 --- a/src/infra/nixl_descriptors.cpp +++ b/src/infra/nixl_descriptors.cpp @@ -112,19 +112,23 @@ nixlBlobDesc::nixlBlobDesc(const nixlBasicDesc &desc, } nixlBlobDesc::nixlBlobDesc(const nixl_blob_t &blob) { + if (blob.size() < sizeof(nixlBasicDesc)) { + NIXL_ERROR << "Blob size is less than the size of nixlBasicDesc"; + addr = 0; + len = 0; + devId = 0; + metaInfo.resize(0); + return; + } size_t meta_size = blob.size() - sizeof(nixlBasicDesc); if (meta_size > 0) { metaInfo.resize(meta_size); blob.copy(reinterpret_cast(this), sizeof(nixlBasicDesc)); - blob.copy(reinterpret_cast(&metaInfo[0]), - meta_size, sizeof(nixlBasicDesc)); + blob.copy(reinterpret_cast(&metaInfo[0]), meta_size, sizeof(nixlBasicDesc)); } else if (meta_size == 0) { - blob.copy(reinterpret_cast(this), sizeof(nixlBasicDesc)); - } else { // Error - addr = 0; - len = 0; - devId = 0; - metaInfo.resize(0); + blob.copy(reinterpret_cast(this), sizeof(nixlBasicDesc)); + } else { + NIXL_ASSERT(false) << "Negative meta size"; } } diff --git a/src/plugins/ucx/ucx_backend.cpp b/src/plugins/ucx/ucx_backend.cpp index 0456c20b4..2128e5d0c 100644 --- a/src/plugins/ucx/ucx_backend.cpp +++ b/src/plugins/ucx/ucx_backend.cpp @@ -1841,6 +1841,7 @@ nixl_status_t nixlUcxEngine::genNotif(const std::string &remote_agent, const std switch(ret) { case NIXL_IN_PROG: /* do not track the request */ + // TODO: why are we releasing a request that is still in progress? getWorker(getWorkerId())->reqRelease(req); case NIXL_SUCCESS: break; diff --git a/src/utils/stream/metadata_stream.cpp b/src/utils/stream/metadata_stream.cpp index 5bbd73ff7..a38d72252 100644 --- a/src/utils/stream/metadata_stream.cpp +++ b/src/utils/stream/metadata_stream.cpp @@ -132,21 +132,21 @@ std::string nixlMDStreamListener::recvFromClient() { return recvData; } -void nixlMDStreamListener::recvFromClients(int clientSocket) { - char buffer[RECV_BUFFER_SIZE]; - int bytes_read; - - while ((bytes_read = recv(clientSocket, buffer, - sizeof(buffer), 0)) > 0) { - buffer[bytes_read] = '\0'; - // Return ack - std::string ack = "Message received"; - send(clientSocket, ack.c_str(), ack.size(), 0); - std::string recv_message(buffer); - NIXL_DEBUG << "Message Received" << recv_message; - } - close(clientSocket); - NIXL_DEBUG << "Client Disconnected"; +void +nixlMDStreamListener::recvFromClients(int clientSocket) { + char buffer[RECV_BUFFER_SIZE + 1]; + int bytes_read; + + while ((bytes_read = recv(clientSocket, buffer, sizeof(buffer), 0)) > 0) { + buffer[bytes_read] = '\0'; + // Return ack + std::string ack = "Message received"; + send(clientSocket, ack.c_str(), ack.size(), 0); + std::string recv_message(buffer); + NIXL_DEBUG << "Message Received" << recv_message; + } + close(clientSocket); + NIXL_DEBUG << "Client Disconnected"; } void nixlMDStreamListener::startListenerForClient() { @@ -169,12 +169,14 @@ nixlMDStreamClient::~nixlMDStreamClient() { closeStream(); } -bool nixlMDStreamClient::setupClient() { +bool +nixlMDStreamClient::setupClient() { setupStream(); - struct sockaddr_in listenerAddr; + struct sockaddr_in listenerAddr {}; + listenerAddr.sin_family = AF_INET; - listenerAddr.sin_port = htons(port); + listenerAddr.sin_port = htons(port); if (inet_pton(AF_INET, listenerAddress.c_str(), &listenerAddr.sin_addr) <= 0) { diff --git a/src/utils/ucx/gpu_xfer_req_h.cpp b/src/utils/ucx/gpu_xfer_req_h.cpp index 3055219b4..f215b350e 100644 --- a/src/utils/ucx/gpu_xfer_req_h.cpp +++ b/src/utils/ucx/gpu_xfer_req_h.cpp @@ -54,31 +54,33 @@ createGpuXferReq(const nixlUcxEp &ep, ucp_elements.reserve(local_mems.size()); for (size_t i = 0; i < local_mems.size(); i++) { - ucp_device_mem_list_elem_t ucp_elem; - ucp_elem.field_mask = UCP_DEVICE_MEM_LIST_ELEM_FIELD_MEMH | - UCP_DEVICE_MEM_LIST_ELEM_FIELD_RKEY | UCP_DEVICE_MEM_LIST_ELEM_FIELD_LOCAL_ADDR | - UCP_DEVICE_MEM_LIST_ELEM_FIELD_REMOTE_ADDR | UCP_DEVICE_MEM_LIST_ELEM_FIELD_LENGTH; - ucp_elem.memh = local_mems[i].getMemh(); - ucp_elem.rkey = remote_rkeys[i]->get(); - ucp_elem.local_addr = local_mems[i].getBase(); - ucp_elem.remote_addr = remote_addrs[i]; - ucp_elem.length = local_mems[i].getSize(); + ucp_device_mem_list_elem_t ucp_elem = { + .field_mask = UCP_DEVICE_MEM_LIST_ELEM_FIELD_MEMH | + UCP_DEVICE_MEM_LIST_ELEM_FIELD_RKEY | UCP_DEVICE_MEM_LIST_ELEM_FIELD_LOCAL_ADDR | + UCP_DEVICE_MEM_LIST_ELEM_FIELD_REMOTE_ADDR | UCP_DEVICE_MEM_LIST_ELEM_FIELD_LENGTH, + .memh = local_mems[i].getMemh(), + .rkey = remote_rkeys[i]->get(), + .local_addr = local_mems[i].getBase(), + .remote_addr = remote_addrs[i], + .length = local_mems[i].getSize(), + }; ucp_elements.push_back(ucp_elem); } - ucp_device_mem_list_params_t params; - params.field_mask = UCP_DEVICE_MEM_LIST_PARAMS_FIELD_ELEMENTS | +ucp_device_mem_list_params_t params = { + .field_mask = UCP_DEVICE_MEM_LIST_PARAMS_FIELD_ELEMENTS | UCP_DEVICE_MEM_LIST_PARAMS_FIELD_ELEMENT_SIZE | - UCP_DEVICE_MEM_LIST_PARAMS_FIELD_NUM_ELEMENTS; - params.elements = ucp_elements.data(); - params.element_size = sizeof(ucp_device_mem_list_elem_t); - params.num_elements = ucp_elements.size(); - - ucp_device_mem_list_handle_h ucx_handle; - ucs_status_t ucs_status = ucp_device_mem_list_create(ep.getEp(), ¶ms, &ucx_handle); - if (ucs_status != UCS_OK) { - throw std::runtime_error(std::string("Failed to create device memory list: ") + - ucs_status_string(ucs_status)); + UCP_DEVICE_MEM_LIST_PARAMS_FIELD_NUM_ELEMENTS, + .elements = ucp_elements.data(), + .element_size = sizeof(ucp_device_mem_list_elem_t), + .num_elements = ucp_elements.size(), +}; + +ucp_device_mem_list_handle_h ucx_handle; +ucs_status_t ucs_status = ucp_device_mem_list_create(ep.getEp(), ¶ms, &ucx_handle); +if (ucs_status != UCS_OK) { + throw std::runtime_error(std::string("Failed to create device memory list: ") + + ucs_status_string(ucs_status)); } NIXL_DEBUG << "Created device memory list: ep=" << ep.getEp() << " handle=" << ucx_handle diff --git a/src/utils/ucx/ucx_utils.cpp b/src/utils/ucx/ucx_utils.cpp index 1bbac53c8..8240fc9ca 100644 --- a/src/utils/ucx/ucx_utils.cpp +++ b/src/utils/ucx/ucx_utils.cpp @@ -155,13 +155,13 @@ void nixlUcxEp::setState(nixl_ucx_ep_state_t new_state) nixl_status_t nixlUcxEp::closeImpl(ucp_ep_close_flags_t flags) { - ucs_status_ptr_t request = nullptr; + ucs_status_ptr_t request = nullptr; ucp_request_param_t req_param = { .op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS, - .flags = flags + .flags = flags, }; - switch(state) { + switch (state) { case NIXL_UCX_EP_STATE_NULL: case NIXL_UCX_EP_STATE_DISCONNECTED: // The EP has not been connected, or already disconnected. @@ -192,21 +192,19 @@ nixlUcxEp::closeImpl(ucp_ep_close_flags_t flags) std::terminate(); } -nixlUcxEp::nixlUcxEp(ucp_worker_h worker, void* addr, - ucp_err_handling_mode_t err_handling_mode) -{ - ucp_ep_params_t ep_params; - nixl_status_t status; - - ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | - UCP_EP_PARAM_FIELD_ERR_HANDLER | - UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE; - ep_params.err_mode = err_handling_mode; - ep_params.err_handler.cb = err_cb_wrapper; - ep_params.err_handler.arg = reinterpret_cast(this); - ep_params.address = reinterpret_cast(addr); - - status = ucx_status_to_nixl(ucp_ep_create(worker, &ep_params, &eph)); +nixlUcxEp::nixlUcxEp(ucp_worker_h worker, void *addr, ucp_err_handling_mode_t err_handling_mode) { + ucp_ep_params_t ep_params = { + .field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | UCP_EP_PARAM_FIELD_ERR_HANDLER | + UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE, + .address = reinterpret_cast(addr), + .err_mode = err_handling_mode, + .err_handler = + { + .cb = err_cb_wrapper, + .arg = reinterpret_cast(this), + }, + }; + nixl_status_t status = ucx_status_to_nixl(ucp_ep_create(worker, &ep_params, &eph)); if (status == NIXL_SUCCESS) setState(NIXL_UCX_EP_STATE_CONNECTED); else @@ -246,10 +244,10 @@ nixl_status_t nixlUcxEp::sendAm(unsigned msg_id, return status; } - ucp_request_param_t param = {0}; - - param.op_attr_mask |= UCP_OP_ATTR_FIELD_FLAGS; - param.flags = flags; + ucp_request_param_t param = { + .op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS, + .flags = flags, + }; ucs_status_ptr_t request = ucp_am_send_nbx(eph, msg_id, hdr, hdr_len, buffer, len, ¶m); if (UCS_PTR_IS_PTR(request)) { @@ -345,29 +343,27 @@ nixl_status_t nixlUcxEp::estimateCost(size_t size, return NIXL_SUCCESS; } -nixl_status_t nixlUcxEp::flushEp(nixlUcxReq &req) -{ - ucp_request_param_t param; - ucs_status_ptr_t request; - - param.op_attr_mask = 0; - request = ucp_ep_flush_nbx(eph, ¶m); +nixl_status_t +nixlUcxEp::flushEp(nixlUcxReq &req) { + ucp_request_param_t param{}; + ucs_status_ptr_t request = ucp_ep_flush_nbx(eph, ¶m); if (UCS_PTR_IS_PTR(request)) { - req = (void*)request; + req = (void *)request; return NIXL_IN_PROG; } return ucx_status_to_nixl(UCS_PTR_STATUS(request)); } -bool nixlUcxMtLevelIsSupported(const nixl_ucx_mt_t mt_type) noexcept -{ - ucp_lib_attr_t attr; - attr.field_mask = UCP_LIB_ATTR_FIELD_MAX_THREAD_LEVEL; +bool +nixlUcxMtLevelIsSupported(const nixl_ucx_mt_t mt_type) noexcept { + ucp_lib_attr_t attr = { + .field_mask = UCP_LIB_ATTR_FIELD_MAX_THREAD_LEVEL, + }; ucp_lib_query(&attr); - switch(mt_type) { + switch (mt_type) { case nixl_ucx_mt_t::SINGLE: return attr.max_thread_level >= UCS_THREAD_MODE_SERIALIZED; case nixl_ucx_mt_t::CTX: @@ -384,9 +380,8 @@ nixlUcxContext::nixlUcxContext(std::vector devs, nixlUcxContext::req_cb_t fini_cb, bool prog_thread, unsigned long num_workers, - nixl_thread_sync_t sync_mode) -{ - ucp_params_t ucp_params; + nixl_thread_sync_t sync_mode) { + ucp_params_t ucp_params{}; // With strict synchronization model nixlAgent serializes access to backends, with more // permissive models backends need to account for concurrent access and ensure their internal @@ -478,24 +473,20 @@ namespace std::terminate(); } - struct nixlUcpWorkerParams - : ucp_worker_params_t + static ucp_worker_params_t toUcpWorkerParams(const nixl_ucx_mt_t t) { - explicit nixlUcpWorkerParams(const nixl_ucx_mt_t t) - { - field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; - thread_mode = toUcsThreadModeChecked(t); - } - }; - - static_assert(sizeof(nixlUcpWorkerParams) == sizeof(ucp_worker_params_t)); + return ucp_worker_params_t{ + .field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE, + .thread_mode = toUcsThreadModeChecked(t), + }; + } } // namespace ucp_worker * nixlUcxWorker::createUcpWorker(const nixlUcxContext &ctx) { ucp_worker* worker = nullptr; - const nixlUcpWorkerParams params(ctx.mt_type); + const ucp_worker_params_t params = toUcpWorkerParams(ctx.mt_type); const ucs_status_t status = ucp_worker_create(ctx.ctx, ¶ms, &worker); if(status != UCS_OK) { throw std::runtime_error(std::string("Failed to create UCX worker: ") + @@ -509,11 +500,12 @@ nixlUcxWorker::nixlUcxWorker(const nixlUcxContext &ctx, ucp_err_handling_mode_t : worker(createUcpWorker(ctx), &ucp_worker_destroy), err_handling_mode_(err_handling_mode) {} -std::string nixlUcxWorker::epAddr() -{ - ucp_worker_attr_t wattr; +std::string +nixlUcxWorker::epAddr() { + ucp_worker_attr_t wattr = { + .field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS, + }; - wattr.field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS; const ucs_status_t status = ucp_worker_query(worker.get(), &wattr); if (UCS_OK != status) { throw std::runtime_error(std::string("Unable to query UCX worker address: ") + @@ -560,12 +552,12 @@ int nixlUcxContext::memReg(void *addr, size_t size, nixlUcxMem &mem, nixl_mem_t } if (nixl_mem_type == nixl_mem_t::VRAM_SEG) { - ucp_mem_attr_t attr; - attr.field_mask = UCP_MEM_ATTR_FIELD_MEM_TYPE; + ucp_mem_attr_t attr = { + .field_mask = UCP_MEM_ATTR_FIELD_MEM_TYPE, + }; status = ucp_mem_query(mem.memh, &attr); if (status != UCS_OK) { - NIXL_ERROR << absl::StrFormat("Failed to ucp_mem_query: %s", - ucs_status_string(status)); + NIXL_ERROR << absl::StrFormat("Failed to ucp_mem_query: %s", ucs_status_string(status)); ucp_mem_unmap(ctx, mem.memh); return -1; } @@ -607,12 +599,12 @@ constexpr std::string_view ucxGpuDeviceApiUnsupported{ #endif - size_t nixlUcxContext::getGpuSignalSize() const { #ifdef HAVE_UCX_GPU_DEVICE_API - ucp_context_attr_t attr; - attr.field_mask = UCP_ATTR_FIELD_DEVICE_COUNTER_SIZE; + ucp_context_attr_t attr = { + .field_mask = UCP_ATTR_FIELD_DEVICE_COUNTER_SIZE, + }; ucs_status_t query_status = ucp_context_query(ctx, &attr); if (query_status != UCS_OK) { @@ -631,17 +623,15 @@ nixlUcxContext::getGpuSignalSize() const { * Active message handling * =========================================== */ -int nixlUcxWorker::regAmCallback(unsigned msg_id, ucp_am_recv_callback_t cb, void* arg) -{ - ucp_am_handler_param_t params = {0}; - - params.field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID | - UCP_AM_HANDLER_PARAM_FIELD_CB | - UCP_AM_HANDLER_PARAM_FIELD_ARG; - - params.id = msg_id; - params.cb = cb; - params.arg = arg; +int +nixlUcxWorker::regAmCallback(unsigned msg_id, ucp_am_recv_callback_t cb, void *arg) { + ucp_am_handler_param_t params = { + .field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID | UCP_AM_HANDLER_PARAM_FIELD_CB | + UCP_AM_HANDLER_PARAM_FIELD_ARG, + .id = msg_id, + .cb = cb, + .arg = arg, + }; const ucs_status_t status = ucp_worker_set_am_recv_handler(worker.get(), ¶ms); @@ -706,9 +696,10 @@ nixlUcxWorker::prepGpuSignal([[maybe_unused]] const nixlUcxMem &mem, throw std::invalid_argument("Signal pointer cannot be null"); } - ucp_device_counter_params_t params; - params.field_mask = UCP_DEVICE_COUNTER_PARAMS_FIELD_MEMH; - params.memh = mem.memh; + ucp_device_counter_params_t params = { + .field_mask = UCP_DEVICE_COUNTER_PARAMS_FIELD_MEMH, + .memh = mem.memh, + }; ucs_status_t status = ucp_device_counter_init(worker.get(), ¶ms, signal); diff --git a/test/unit/plugins/ucx/ucx_backend_test.cpp b/test/unit/plugins/ucx/ucx_backend_test.cpp index 30b207c0b..429a0d7c7 100644 --- a/test/unit/plugins/ucx/ucx_backend_test.cpp +++ b/test/unit/plugins/ucx/ucx_backend_test.cpp @@ -393,7 +393,7 @@ performTransfer(nixlUcxEngine *ucx1, std::string test_str("test"); std::cout << "\t" << op2string(op, use_notif) << " from " << addr1 << " to " << addr2 << "\n"; - nixl_opt_b_args_t opt_args; + nixl_opt_b_args_t opt_args{}; opt_args.notifMsg = test_str; opt_args.hasNotif = use_notif;