@@ -23,7 +23,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
2323 connection_pool_() {
2424 RAY_CHECK (config_.max_sends > 0 );
2525 RAY_CHECK (config_.max_receives > 0 );
26- RAY_CHECK (config_.push_timeout_ms > 0 );
2726 main_service_ = &main_service;
2827 store_notification_.SubscribeObjAdded (
2928 [this ](const ObjectInfoT &object_info) { NotifyDirectoryObjectAdd (object_info); });
@@ -47,7 +46,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
4746 connection_pool_() {
4847 RAY_CHECK (config_.max_sends > 0 );
4948 RAY_CHECK (config_.max_receives > 0 );
50- RAY_CHECK (config_.push_timeout_ms > 0 );
5149 // TODO(hme) Client ID is never set with this constructor.
5250 main_service_ = &main_service;
5351 store_notification_.SubscribeObjAdded (
@@ -90,12 +88,16 @@ void ObjectManager::NotifyDirectoryObjectAdd(const ObjectInfoT &object_info) {
9088 local_objects_[object_id] = object_info;
9189 ray::Status status =
9290 object_directory_->ReportObjectAdded (object_id, client_id_, object_info);
91+ // Handle the unfulfilled_push_tasks_ which contains the push request that is not
92+ // completed due to unsatisfied local objects.
9393 auto iter = unfulfilled_push_tasks_.find (object_id);
9494 if (iter != unfulfilled_push_tasks_.end ()) {
9595 for (auto &pair : iter->second ) {
9696 main_service_->post (
9797 [this , object_id, pair]() { RAY_CHECK_OK (Push (object_id, pair.first )); });
98- pair.second ->cancel ();
98+ if (pair.second != nullptr ) {
99+ pair.second ->cancel ();
100+ }
99101 }
100102 unfulfilled_push_tasks_.erase (iter);
101103 }
@@ -220,19 +222,33 @@ void ObjectManager::HandlePushTaskTimeout(const ObjectID &object_id,
220222
221223ray::Status ObjectManager::Push (const ObjectID &object_id, const ClientID &client_id) {
222224 if (local_objects_.count (object_id) == 0 ) {
223- // Put the task into a queue and wait for the notification of Object added.
224- auto timer = std::shared_ptr<boost::asio::deadline_timer>(
225- new boost::asio::deadline_timer (*main_service_));
226- auto clean_push_period = boost::posix_time::milliseconds (config_.push_timeout_ms );
227- timer->expires_from_now (clean_push_period);
228- timer->async_wait (
229- [this , object_id, client_id](const boost::system::error_code &error) {
230- // Only handle the timeout event and skip all other events.
231- if (!error) {
232- HandlePushTaskTimeout (object_id, client_id);
233- }
234- });
235- unfulfilled_push_tasks_[object_id].emplace (client_id, timer);
225+ // Avoid setting duplicated timer for the same object andn clinet pair.
226+ auto &pair = unfulfilled_push_tasks_[object_id];
227+ if (pair.count (client_id) == 0 ) {
228+ // If onfig_.push_timeout_ms < 0, we give an empty timer
229+ // and the task will be kept infinitely.
230+ auto timer = std::shared_ptr<boost::asio::deadline_timer>();
231+ if (config_.push_timeout_ms == 0 ) {
232+ // The Push request fails directly when onfig_.push_timeout_ms == 0.
233+ RAY_LOG (ERROR) << " Invalid Push request ObjectID: " << object_id
234+ << " due to direct failure setting. " ;
235+ return ray::Status::OK ();
236+ } else if (config_.push_timeout_ms > 0 ) {
237+ // Put the task into a queue and wait for the notification of Object added.
238+ timer.reset (new boost::asio::deadline_timer (*main_service_));
239+ auto clean_push_period = boost::posix_time::milliseconds (config_.push_timeout_ms );
240+ timer->expires_from_now (clean_push_period);
241+ timer->async_wait (
242+ [this , object_id, client_id](const boost::system::error_code &error) {
243+ // Timer killing will receive the boost::asio::error::operation_aborted,
244+ // we only handle the timeout event.
245+ if (!error) {
246+ HandlePushTaskTimeout (object_id, client_id);
247+ }
248+ });
249+ }
250+ pair.emplace (client_id, timer);
251+ }
236252 return ray::Status::OK ();
237253 }
238254
0 commit comments