Skip to content

Commit

Permalink
UCX: Remove EPs from storages in error handling callback (#43)
Browse files Browse the repository at this point in the history
* UCX: Remove EPs from storages in error handling callback

* UCX: Assert that EP was found in server EPs hash
  • Loading branch information
dmitrygx authored Oct 29, 2020
1 parent c62cca0 commit 03fe959
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions src/ucx_van.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,28 @@ class UCXEndpointsPool {
void ErrorHandler(ucp_ep_h ep)
{
mu_.lock();
auto check = [ep](const auto &mo) {return mo.second->ep == ep;};
auto result = std::find_if(client_eps_.begin(), client_eps_.end(), check);
if (result != client_eps_.end()) {
UCXEp *uep = result->second.get();
if (!uep->connected) {
uep->ep = nullptr;
std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_tmo_));
Create(uep);
UCX_LOGE(1, "ep close errh: " << ep << "|" << result->first
<< " Reconnect, close reqs " << close_ep_reqs_.size());
} else {
UCX_LOGE(1, "ep close errh: " << ep << "|" << result->first
<< " peer failure");
}
auto client_check = [ep](const auto &mo) {return mo.second->ep == ep;};
auto client_it = std::find_if(client_eps_.begin(), client_eps_.end(),
client_check);
if (client_it != client_eps_.end()) {
UCXEp *uep = client_it->second.get();
if (!uep->connected) {
uep->ep = nullptr;
std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_tmo_));
Create(uep);
UCX_LOGE(1, "ep close errh: " << ep << "|" << client_it->first
<< " Reconnect, close reqs " << close_ep_reqs_.size());
} else {
UCX_LOGE(1, "ep close errh: " << ep << "|" << client_it->first
<< " peer failure");
client_eps_.erase(client_it);
}
} else {
auto server_check = [ep](const auto &mo) {return mo->ep == ep;};
auto server_it = std::find_if(server_eps_.begin(), server_eps_.end(),
server_check);
assert(server_it != server_eps_.end());
server_eps_.erase(server_it);
UCX_LOGE(1, "ep close errh: " << ep);
}
mu_.unlock();
Expand Down

0 comments on commit 03fe959

Please sign in to comment.