Skip to content

Conversation

@michal-shalev
Copy link
Contributor

@michal-shalev michal-shalev commented Oct 23, 2025

What?

Add blocking wait for endpoint wireup completion in createGpuXferReq().

Why?

Previously, users had to implement workarounds in their applications to wait for wireup completion before calling createGpuXferReq() (as shown in UCX tests).
This PR moves the wireup handling into the library, simplifying the API and removing the burden from application code.

How?

  • Retry ucp_device_mem_list_create() in a loop while it returns UCS_ERR_NOT_CONNECTED
  • Call worker.progress() in each iteration to advance the wireup state machine
  • Document the blocking behavior in the public API

@github-actions
Copy link

👋 Hi michal-shalev! Thank you for contributing to ai-dynamo/nixl.

Your PR reviewers will review your contribution then trigger the CI to test your changes.

🚀

@michal-shalev
Copy link
Contributor Author

/build

2 similar comments
@michal-shalev
Copy link
Contributor Author

/build

@michal-shalev
Copy link
Contributor Author

/build

brminich
brminich previously approved these changes Oct 28, 2025
@brminich
Copy link
Contributor

/build

Signed-off-by: Michal Shalev <[email protected]>
Signed-off-by: Michal Shalev <[email protected]>
params.num_elements = ucp_elements.size();

const auto start = std::chrono::steady_clock::now();
constexpr auto timeout = std::chrono::seconds(5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about making it configurable via environment variable?

if (std::chrono::steady_clock::now() - start > timeout) {
throw std::runtime_error(
"Timeout waiting for endpoint wireup completion has been exceeded");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to swap the time check and the execution of the progress on the workers. Otherwise, we may throw the exception even when the wireup is completed on this iteration.

Optional. I'd prefer to do a time loop. E.g.:

for (const auto start = std::chrono::steady_clock::now();
     std::chrono::steady_clock::now() - start <= timeout;)
{
    status = ucp_device_mem_list_create(ep.getEp(), &params, &ucx_handle);
    if (status != UCS_ERR_NOT_CONNECTED) {
        break;
    }

    for (const auto &w : workers) {
        w->progress();
    }
}

if (status == UCS_ERR_NOT_CONNECTED) {
    throw std::runtime_error("Timeout waiting for endpoint wireup completion has been exceeded");
} else if (status != UCS_OK) {
    throw std::runtime_error(std::string("Failed to create device memory list: ") +
                             ucs_status_string(ucs_status));
}


nixlGpuXferReqH
createGpuXferReq(const nixlUcxEp &ep,
const std::vector<std::unique_ptr<nixlUcxWorker>> &all_workers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like all_ is redundant for this parameter. workers would be enough.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants