From affa65909c65388d0afd9b1ed0f3c111c62881e4 Mon Sep 17 00:00:00 2001 From: jinjiang Date: Wed, 16 May 2018 16:00:20 +0800 Subject: [PATCH] Add lock for pull_requests_ to avoid undefined behavior. --- src/ray/object_manager/object_manager.cc | 22 +++++++++++++++------- src/ray/object_manager/object_manager.h | 4 ++++ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 42ef8164aee7..d291b9f8d5a5 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -110,13 +110,21 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) { } void ObjectManager::SchedulePull(const ObjectID &object_id, int wait_ms) { - pull_requests_[object_id] = std::make_shared( - *main_service_, boost::posix_time::milliseconds(wait_ms)); - pull_requests_[object_id]->async_wait( - [this, object_id](const boost::system::error_code &error_code) { - pull_requests_.erase(object_id); - RAY_CHECK_OK(PullGetLocations(object_id)); - }); + auto timer = std::make_shared( + *main_service_, boost::posix_time::milliseconds(wait_ms)); + + { + std::lock_guard lock(pull_requests_lock_); + pull_requests_[object_id] = timer; + } + + timer->async_wait([this, object_id](const boost::system::error_code &error_code) { + { + std::lock_guard lock(pull_requests_lock_); + pull_requests_.erase(object_id); + } + RAY_CHECK_OK(PullGetLocations(object_id)); + }); } ray::Status ObjectManager::PullGetLocations(const ObjectID &object_id) { diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index d34d50762f24..26cfe7722d2e 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -178,6 +179,9 @@ class ObjectManager { /// Connection pool for reusing outgoing connections to remote object managers. ConnectionPool connection_pool_; + /// Lock for pull_requests_ + std::mutex pull_requests_lock_; + /// Timeout for failed pull requests. std::unordered_map> pull_requests_;