diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index c6c12aa53069..85f591373661 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -368,10 +368,13 @@ void ClientTable::HandleNotification(AsyncGcsClient *client, } RAY_CHECK(removed_clients_.find(client_id) == removed_clients_.end()); } else { + // NOTE(swang): The client should be added to this data structure before + // the callback gets called, in case the callback depends on the data + // structure getting updated. + removed_clients_.insert(client_id); if (client_removed_callback_ != nullptr) { client_removed_callback_(client, client_id, data); } - removed_clients_.insert(client_id); } } } diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index ab20d27b66c6..19f61db41b80 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -8,10 +8,15 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service, namespace { -std::vector UpdateObjectLocations( - std::unordered_set &client_ids, - const std::vector &location_history, - const ray::gcs::ClientTable &client_table) { +/// Process a suffix of the object table log and store the result in +/// client_ids. This assumes that client_ids already contains the result of the +/// object table log up to but not including this suffix. This also stores a +/// bool in has_been_created indicating whether the object has ever been +/// created before. +void UpdateObjectLocations(const std::vector &location_history, + const ray::gcs::ClientTable &client_table, + std::unordered_set *client_ids, + bool *has_been_created) { // location_history contains the history of locations of the object (it is a log), // which might look like the following: // client1.is_eviction = false @@ -19,23 +24,27 @@ std::vector UpdateObjectLocations( // client2.is_eviction = false // In such a scenario, we want to indicate client2 is the only client that contains // the object, which the following code achieves. + if (!location_history.empty()) { + // If there are entries, then the object has been created. Once this flag + // is set to true, it should never go back to false. + *has_been_created = true; + } for (const auto &object_table_data : location_history) { ClientID client_id = ClientID::from_binary(object_table_data.manager); if (!object_table_data.is_eviction) { - client_ids.insert(client_id); + client_ids->insert(client_id); } else { - client_ids.erase(client_id); + client_ids->erase(client_id); } } // Filter out the removed clients from the object locations. - for (auto it = client_ids.begin(); it != client_ids.end();) { + for (auto it = client_ids->begin(); it != client_ids->end();) { if (client_table.IsRemoved(*it)) { - it = client_ids.erase(it); + it = client_ids->erase(it); } else { it++; } } - return std::vector(client_ids.begin(), client_ids.end()); } } // namespace @@ -45,18 +54,18 @@ void ObjectDirectory::RegisterBackend() { gcs::AsyncGcsClient *client, const ObjectID &object_id, const std::vector &location_history) { // Objects are added to this map in SubscribeObjectLocations. - auto object_id_listener_pair = listeners_.find(object_id); + auto it = listeners_.find(object_id); // Do nothing for objects we are not listening for. - if (object_id_listener_pair == listeners_.end()) { + if (it == listeners_.end()) { return; } // Update entries for this object. - std::vector client_id_vec = - UpdateObjectLocations(object_id_listener_pair->second.current_object_locations, - location_history, gcs_client_->client_table()); + UpdateObjectLocations(location_history, gcs_client_->client_table(), + &it->second.current_object_locations, + &it->second.has_been_created); // Copy the callbacks so that the callbacks can unsubscribe without interrupting // looping over the callbacks. - auto callbacks = object_id_listener_pair->second.callbacks; + auto callbacks = it->second.callbacks; // Call all callbacks associated with the object id locations we have // received. This notifies the client even if the list of locations is // empty, since this may indicate that the objects have been evicted from @@ -64,7 +73,8 @@ void ObjectDirectory::RegisterBackend() { for (const auto &callback_pair : callbacks) { // It is safe to call the callback directly since this is already running // in the subscription callback stack. - callback_pair.second(client_id_vec, object_id); + callback_pair.second(object_id, it->second.current_object_locations, + it->second.has_been_created); } }; RAY_CHECK_OK(gcs_client_->object_table().Subscribe( @@ -133,28 +143,51 @@ std::vector ObjectDirectory::LookupAllRemoteConnections() return remote_connections; } +void ObjectDirectory::HandleClientRemoved(const ClientID &client_id) { + for (auto &listener : listeners_) { + const ObjectID &object_id = listener.first; + if (listener.second.current_object_locations.count(client_id) > 0) { + // If the subscribed object has the removed client as a location, update + // its locations with an empty log so that the location will be removed. + UpdateObjectLocations({}, gcs_client_->client_table(), + &listener.second.current_object_locations, + &listener.second.has_been_created); + // Re-call all the subscribed callbacks for the object, since its + // locations have changed. + for (const auto &callback_pair : listener.second.callbacks) { + // It is safe to call the callback directly since this is already running + // in the subscription callback stack. + callback_pair.second(object_id, listener.second.current_object_locations, + listener.second.has_been_created); + } + } + } +} + ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_id, const ObjectID &object_id, const OnLocationsFound &callback) { ray::Status status = ray::Status::OK(); - if (listeners_.find(object_id) == listeners_.end()) { - listeners_.emplace(object_id, LocationListenerState()); + auto it = listeners_.find(object_id); + if (it == listeners_.end()) { + it = listeners_.emplace(object_id, LocationListenerState()).first; status = gcs_client_->object_table().RequestNotifications( JobID::nil(), object_id, gcs_client_->client_table().GetLocalClientId()); } - auto &listener_state = listeners_.find(object_id)->second; + auto &listener_state = it->second; // TODO(hme): Make this fatal after implementing Pull suppression. if (listener_state.callbacks.count(callback_id) > 0) { return ray::Status::OK(); } listener_state.callbacks.emplace(callback_id, callback); - // Immediately notify of object locations. This notifies the client even if - // the list of locations is empty, since this may indicate that the objects - // have been evicted from all nodes. - std::vector client_id_vec(listener_state.current_object_locations.begin(), - listener_state.current_object_locations.end()); - io_service_.post( - [callback, client_id_vec, object_id]() { callback(client_id_vec, object_id); }); + // If we previously received some notifications about the object's locations, + // immediately notify the caller of the current known locations. + if (listener_state.has_been_created) { + auto &locations = listener_state.current_object_locations; + io_service_.post([callback, locations, object_id]() { + callback(object_id, locations, /*has_been_created=*/true); + }); + } return status; } @@ -176,19 +209,32 @@ ray::Status ObjectDirectory::UnsubscribeObjectLocations(const UniqueID &callback ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, const OnLocationsFound &callback) { - JobID job_id = JobID::nil(); - ray::Status status = gcs_client_->object_table().Lookup( - job_id, object_id, - [this, callback](gcs::AsyncGcsClient *client, const ObjectID &object_id, - const std::vector &location_history) { - // Build the set of current locations based on the entries in the log. - std::unordered_set client_ids; - std::vector locations_vector = UpdateObjectLocations( - client_ids, location_history, gcs_client_->client_table()); - // It is safe to call the callback directly since this is already running - // in the GCS client's lookup callback stack. - callback(locations_vector, object_id); - }); + ray::Status status; + auto it = listeners_.find(object_id); + if (it == listeners_.end()) { + JobID job_id = JobID::nil(); + status = gcs_client_->object_table().Lookup( + job_id, object_id, + [this, callback](gcs::AsyncGcsClient *client, const ObjectID &object_id, + const std::vector &location_history) { + // Build the set of current locations based on the entries in the log. + std::unordered_set client_ids; + bool has_been_created = false; + UpdateObjectLocations(location_history, gcs_client_->client_table(), + &client_ids, &has_been_created); + // It is safe to call the callback directly since this is already running + // in the GCS client's lookup callback stack. + callback(object_id, client_ids, has_been_created); + }); + } else { + // If we have locations cached due to a concurrent SubscribeObjectLocations + // call, call the callback immediately with the cached locations. + auto &locations = it->second.current_object_locations; + bool has_been_created = it->second.has_been_created; + io_service_.post([callback, object_id, locations, has_been_created]() { + callback(object_id, locations, has_been_created); + }); + } return status; } diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index e36c4c41604e..b44197b639ef 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -48,8 +48,9 @@ class ObjectDirectoryInterface { virtual std::vector LookupAllRemoteConnections() const = 0; /// Callback for object location notifications. - using OnLocationsFound = std::function &, - const ray::ObjectID &object_id)>; + using OnLocationsFound = std::function &, + bool has_been_created)>; /// Lookup object locations. Callback may be invoked with empty list of client ids. /// @@ -59,6 +60,13 @@ class ObjectDirectoryInterface { virtual ray::Status LookupLocations(const ObjectID &object_id, const OnLocationsFound &callback) = 0; + /// Handle the removal of an object manager client. This updates the + /// locations of all subscribed objects that have the removed client as a + /// location, and fires the subscribed callbacks for those objects. + /// + /// \param client_id The object manager client that was removed. + virtual void HandleClientRemoved(const ClientID &client_id) = 0; + /// Subscribe to be notified of locations (ClientID) of the given object. /// The callback will be invoked with the complete list of known locations /// whenever the set of locations changes. The callback will also be fired if @@ -138,6 +146,8 @@ class ObjectDirectory : public ObjectDirectoryInterface { ray::Status LookupLocations(const ObjectID &object_id, const OnLocationsFound &callback) override; + void HandleClientRemoved(const ClientID &client_id) override; + ray::Status SubscribeObjectLocations(const UniqueID &callback_id, const ObjectID &object_id, const OnLocationsFound &callback) override; @@ -164,6 +174,12 @@ class ObjectDirectory : public ObjectDirectoryInterface { std::unordered_map callbacks; /// The current set of known locations of this object. std::unordered_set current_object_locations; + /// This flag will get set to true if the object has ever been created. It + /// should never go back to false once set to true. If this is true, and + /// the current_object_locations is empty, then this means that the object + /// does not exist on any nodes due to eviction (rather than due to the + /// object never getting created, for instance). + bool has_been_created; }; /// Reference to the event loop. diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index a3cc87c7f17c..5f6a503b6f07 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -10,37 +10,9 @@ namespace ray { ObjectManager::ObjectManager(asio::io_service &main_service, const ObjectManagerConfig &config, - std::shared_ptr gcs_client) - // TODO(hme): Eliminate knowledge of GCS. - : client_id_(gcs_client->client_table().GetLocalClientId()), - config_(config), - object_directory_(new ObjectDirectory(main_service, gcs_client)), - store_notification_(main_service, config_.store_socket_name), - // release_delay of 2 * config_.max_sends is to ensure the pool does not release - // an object prematurely whenever we reach the maximum number of sends. - buffer_pool_(config_.store_socket_name, config_.object_chunk_size, - /*release_delay=*/2 * config_.max_sends), - send_work_(send_service_), - receive_work_(receive_service_), - connection_pool_(), - gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) { - RAY_CHECK(config_.max_sends > 0); - RAY_CHECK(config_.max_receives > 0); - main_service_ = &main_service; - store_notification_.SubscribeObjAdded( - [this](const object_manager::protocol::ObjectInfoT &object_info) { - HandleObjectAdded(object_info); - }); - store_notification_.SubscribeObjDeleted( - [this](const ObjectID &oid) { NotifyDirectoryObjectDeleted(oid); }); - StartIOService(); -} - -ObjectManager::ObjectManager(asio::io_service &main_service, - const ObjectManagerConfig &config, - std::unique_ptr od) + std::shared_ptr object_directory) : config_(config), - object_directory_(std::move(od)), + object_directory_(std::move(object_directory)), store_notification_(main_service, config_.store_socket_name), // release_delay of 2 * config_.max_sends is to ensure the pool does not release // an object prematurely whenever we reach the maximum number of sends. @@ -156,7 +128,8 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) { // no ordering guarantee between notifications. return object_directory_->SubscribeObjectLocations( object_directory_pull_callback_id_, object_id, - [this](const std::vector &client_ids, const ObjectID &object_id) { + [this](const ObjectID &object_id, const std::unordered_set &client_ids, + bool created) { // Exit if the Pull request has already been fulfilled or canceled. auto it = pull_requests_.find(object_id); if (it == pull_requests_.end()) { @@ -166,7 +139,8 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) { // NOTE(swang): Since we are overwriting the previous list of clients, // we may end up sending a duplicate request to the same client as // before. - it->second.client_locations = client_ids; + it->second.client_locations = + std::vector(client_ids.begin(), client_ids.end()); if (it->second.client_locations.empty()) { // The object locations are now empty, so we should wait for the next // notification about a new object location. Cancel the timer until @@ -591,8 +565,9 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) { // Lookup remaining objects. wait_state.requested_objects.insert(object_id); RAY_RETURN_NOT_OK(object_directory_->LookupLocations( - object_id, [this, wait_id](const std::vector &client_ids, - const ObjectID &lookup_object_id) { + object_id, + [this, wait_id](const ObjectID &lookup_object_id, + const std::unordered_set &client_ids, bool created) { auto &wait_state = active_wait_requests_.find(wait_id)->second; if (!client_ids.empty()) { wait_state.remaining.erase(lookup_object_id); @@ -624,8 +599,9 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { wait_state.requested_objects.insert(object_id); // Subscribe to object notifications. RAY_CHECK_OK(object_directory_->SubscribeObjectLocations( - wait_id, object_id, [this, wait_id](const std::vector &client_ids, - const ObjectID &subscribe_object_id) { + wait_id, object_id, + [this, wait_id](const ObjectID &subscribe_object_id, + const std::unordered_set &client_ids, bool created) { if (!client_ids.empty()) { auto object_id_wait_state = active_wait_requests_.find(wait_id); if (object_id_wait_state == active_wait_requests_.end()) { diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index b2583376701e..57cf27e8b77b 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -70,25 +70,16 @@ class ObjectManagerInterface { // TODO(hme): Add success/failure callbacks for push and pull. class ObjectManager : public ObjectManagerInterface { public: - /// Implicitly instantiates Ray implementation of ObjectDirectory. - /// - /// \param main_service The main asio io_service. - /// \param config ObjectManager configuration. - /// \param gcs_client A client connection to the Ray GCS. - explicit ObjectManager(boost::asio::io_service &main_service, - const ObjectManagerConfig &config, - std::shared_ptr gcs_client); - /// Takes user-defined ObjectDirectoryInterface implementation. /// When this constructor is used, the ObjectManager assumes ownership of /// the given ObjectDirectory instance. /// /// \param main_service The main asio io_service. /// \param config ObjectManager configuration. - /// \param od An object implementing the object directory interface. + /// \param object_directory An object implementing the object directory interface. explicit ObjectManager(boost::asio::io_service &main_service, const ObjectManagerConfig &config, - std::unique_ptr od); + std::shared_ptr object_directory); ~ObjectManager(); @@ -363,7 +354,7 @@ class ObjectManager : public ObjectManagerInterface { ClientID client_id_; const ObjectManagerConfig config_; - std::unique_ptr object_directory_; + std::shared_ptr object_directory_; ObjectStoreNotificationManager store_notification_; ObjectBufferPool buffer_pool_; diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index c7f38b0c7f1d..73121ada867a 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -33,7 +33,8 @@ class MockServer { main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)), object_manager_socket_(main_service), gcs_client_(gcs_client), - object_manager_(main_service, object_manager_config, gcs_client) { + object_manager_(main_service, object_manager_config, + std::make_shared(main_service, gcs_client_)) { RAY_CHECK_OK(RegisterGcs(main_service)); // Start listening for clients. DoAcceptObjectManager(); diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index cb98706753d9..4184bca2b8aa 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -24,7 +24,8 @@ class MockServer { main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)), object_manager_socket_(main_service), gcs_client_(gcs_client), - object_manager_(main_service, object_manager_config, gcs_client) { + object_manager_(main_service, object_manager_config, + std::make_shared(main_service, gcs_client_)) { RAY_CHECK_OK(RegisterGcs(main_service)); // Start listening for clients. DoAcceptObjectManager(); @@ -285,8 +286,9 @@ class TestObjectManager : public TestObjectManagerBase { RAY_CHECK_OK(server1->object_manager_.object_directory_->SubscribeObjectLocations( sub_id, object_1, - [this, sub_id, object_1, object_2](const std::vector &clients, - const ray::ObjectID &object_id) { + [this, sub_id, object_1, object_2]( + const ray::ObjectID &object_id, + const std::unordered_set &clients, bool created) { if (!clients.empty()) { TestWaitWhileSubscribed(sub_id, object_1, object_2); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e5cd2a7df8bb..e15c41946fce 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -46,10 +46,12 @@ namespace raylet { NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeManagerConfig &config, ObjectManager &object_manager, - std::shared_ptr gcs_client) + std::shared_ptr gcs_client, + std::shared_ptr object_directory) : io_service_(io_service), object_manager_(object_manager), - gcs_client_(gcs_client), + gcs_client_(std::move(gcs_client)), + object_directory_(std::move(object_directory)), heartbeat_timer_(io_service), heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)), debug_dump_period_(config.debug_dump_period_ms), @@ -62,18 +64,19 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, scheduling_policy_(local_queues_), reconstruction_policy_( io_service_, - [this](const TaskID &task_id) { HandleTaskReconstruction(task_id); }, + [this](const TaskID &task_id, bool return_values_lost) { + HandleTaskReconstruction(task_id); + }, RayConfig::instance().initial_reconstruction_timeout_milliseconds(), - gcs_client_->client_table().GetLocalClientId(), gcs_client->task_lease_table(), - std::make_shared(io_service, gcs_client), - gcs_client_->task_reconstruction_log()), + gcs_client_->client_table().GetLocalClientId(), gcs_client_->task_lease_table(), + object_directory_, gcs_client_->task_reconstruction_log()), task_dependency_manager_( object_manager, reconstruction_policy_, io_service, gcs_client_->client_table().GetLocalClientId(), RayConfig::instance().initial_reconstruction_timeout_milliseconds(), - gcs_client->task_lease_table()), + gcs_client_->task_lease_table()), lineage_cache_(gcs_client_->client_table().GetLocalClientId(), - gcs_client->raylet_task_table(), gcs_client->raylet_task_table(), + gcs_client_->raylet_task_table(), gcs_client_->raylet_task_table(), config.max_lineage_size), remote_clients_(), remote_server_connections_(), @@ -417,6 +420,9 @@ void NodeManager::ClientRemoved(const ClientTableDataT &client_data) { /*intentional_disconnect=*/false); } } + // Notify the object directory that the client has been removed so that it + // can remove it from any cached locations. + object_directory_->HandleClientRemoved(client_id); } void NodeManager::HeartbeatAdded(const ClientID &client_id, @@ -1157,6 +1163,44 @@ void NodeManager::TreatTaskAsFailed(const Task &task) { task_dependency_manager_.UnsubscribeDependencies(spec.TaskId()); } +void NodeManager::TreatTaskAsFailedIfLost(const Task &task) { + const TaskSpecification &spec = task.GetTaskSpecification(); + RAY_LOG(DEBUG) << "Treating task " << spec.TaskId() << " as failed."; + // Loop over the return IDs (except the dummy ID) and check whether a + // location for the return ID exists. + int64_t num_returns = spec.NumReturns(); + if (spec.IsActorCreationTask() || spec.IsActorTask()) { + // TODO(rkn): We subtract 1 to avoid the dummy ID. However, this leaks + // information about the TaskSpecification implementation. + num_returns -= 1; + } + // Use a shared flag to make sure that we only treat the task as failed at + // most once. This flag will get deallocated once all of the object table + // lookup callbacks are fired. + auto task_marked_as_failed = std::make_shared(false); + for (int64_t i = 0; i < num_returns; i++) { + const ObjectID object_id = spec.ReturnId(i); + // Lookup the return value's locations. + RAY_CHECK_OK(object_directory_->LookupLocations( + object_id, + [this, task_marked_as_failed, task]( + const ray::ObjectID &object_id, + const std::unordered_set &clients, bool has_been_created) { + if (!*task_marked_as_failed) { + // Only process the object locations if we haven't already marked the + // task as failed. + if (clients.empty() && has_been_created) { + // The object does not exist on any nodes but has been created + // before, so the object has been lost. Mark the task as failed to + // prevent any tasks that depend on this object from hanging. + TreatTaskAsFailed(task); + *task_marked_as_failed = true; + } + } + })); + } +} + void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage, bool forwarded) { const TaskSpecification &spec = task.GetTaskSpecification(); @@ -1401,7 +1445,11 @@ bool NodeManager::AssignTask(const Task &task) { // If this is an actor task, check that the new task has the correct counter. if (spec.IsActorTask()) { if (CheckDuplicateActorTask(actor_registry_, spec)) { - // This actor has been already assigned, so ignore it. + // The actor is alive, and a task that has already been executed before + // has been found. The task will be treated as failed if at least one of + // the task's return values have been evicted, to prevent the application + // from hanging. + TreatTaskAsFailedIfLost(task); return true; } } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 951fc2fa7d92..d8502e7d8005 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -58,7 +58,8 @@ class NodeManager { /// \param object_manager A reference to the local object manager. NodeManager(boost::asio::io_service &io_service, const NodeManagerConfig &config, ObjectManager &object_manager, - std::shared_ptr gcs_client); + std::shared_ptr gcs_client, + std::shared_ptr object_directory_); /// Process a new client connection. /// @@ -153,9 +154,17 @@ class NodeManager { /// it were any other task that had been assigned, executed, and removed from /// the local queue. /// - /// \param spec The specification of the task. + /// \param task The task to fail. /// \return Void. void TreatTaskAsFailed(const Task &task); + /// This is similar to TreatTaskAsFailed, but it will only mark the task as + /// failed if at least one of the task's return values is lost. A return + /// value is lost if it has been created before, but no longer exists on any + /// nodes, due to either node failure or eviction. + /// + /// \param task The task to potentially fail. + /// \return Void. + void TreatTaskAsFailedIfLost(const Task &task); /// Handle specified task's submission to the local node manager. /// /// \param task The task being submitted. @@ -389,6 +398,8 @@ class NodeManager { plasma::PlasmaClient store_client_; /// A client connection to the GCS. std::shared_ptr gcs_client_; + /// The object table. This is shared with the object manager. + std::shared_ptr object_directory_; /// The timer used to send heartbeats. boost::asio::steady_timer heartbeat_timer_; /// The period used for the heartbeat timer. diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index 679b18052920..39028ce7f96b 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -18,8 +18,10 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ const ObjectManagerConfig &object_manager_config, std::shared_ptr gcs_client) : gcs_client_(gcs_client), - object_manager_(main_service, object_manager_config, gcs_client), - node_manager_(main_service, node_manager_config, object_manager_, gcs_client_), + object_directory_(std::make_shared(main_service, gcs_client_)), + object_manager_(main_service, object_manager_config, object_directory_), + node_manager_(main_service, node_manager_config, object_manager_, gcs_client_, + object_directory_), socket_name_(socket_name), acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)), socket_(main_service), diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h index 9b424781af17..84274ea6ecfe 100644 --- a/src/ray/raylet/raylet.h +++ b/src/ray/raylet/raylet.h @@ -70,6 +70,9 @@ class Raylet { /// A client connection to the GCS. std::shared_ptr gcs_client_; + /// The object table. This is shared between the object manager and node + /// manager. + std::shared_ptr object_directory_; /// Manages client requests for object transfers and availability. ObjectManager object_manager_; /// Manages client requests for task submission and execution. diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index 6abf5b53d824..d698402994a4 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -6,7 +6,7 @@ namespace raylet { ReconstructionPolicy::ReconstructionPolicy( boost::asio::io_service &io_service, - std::function reconstruction_handler, + std::function reconstruction_handler, int64_t initial_reconstruction_timeout_ms, const ClientID &client_id, gcs::PubsubInterface &task_lease_pubsub, std::shared_ptr object_directory, @@ -74,13 +74,14 @@ void ReconstructionPolicy::HandleReconstructionLogAppend(const TaskID &task_id, SetTaskTimeout(it, initial_reconstruction_timeout_ms_); if (success) { - reconstruction_handler_(task_id); + reconstruction_handler_(task_id, it->second.return_values_lost); } } void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id, const ObjectID &required_object_id, - int reconstruction_attempt) { + int reconstruction_attempt, + bool created) { // If we are no longer listening for objects created by this task, give up. auto it = listening_tasks_.find(task_id); if (it == listening_tasks_.end()) { @@ -92,6 +93,10 @@ void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id, return; } + if (created) { + it->second.return_values_lost = true; + } + // Suppress duplicate reconstructions of the same task. This can happen if, // for example, a task creates two different objects that both require // reconstruction. @@ -138,12 +143,13 @@ void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) { for (const auto &created_object_id : it->second.created_objects) { RAY_CHECK_OK(object_directory_->LookupLocations( created_object_id, - [this, task_id, reconstruction_attempt](const std::vector &clients, - const ray::ObjectID &object_id) { + [this, task_id, reconstruction_attempt]( + const ray::ObjectID &object_id, + const std::unordered_set &clients, bool created) { if (clients.empty()) { // The required object no longer exists on any live nodes. Attempt // reconstruction. - AttemptReconstruction(task_id, object_id, reconstruction_attempt); + AttemptReconstruction(task_id, object_id, reconstruction_attempt, created); } })); } diff --git a/src/ray/raylet/reconstruction_policy.h b/src/ray/raylet/reconstruction_policy.h index f18290aa3725..d936a632e1f1 100644 --- a/src/ray/raylet/reconstruction_policy.h +++ b/src/ray/raylet/reconstruction_policy.h @@ -40,7 +40,7 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { /// lease notifications from. ReconstructionPolicy( boost::asio::io_service &io_service, - std::function reconstruction_handler, + std::function reconstruction_handler, int64_t initial_reconstruction_timeout_ms, const ClientID &client_id, gcs::PubsubInterface &task_lease_pubsub, std::shared_ptr object_directory, @@ -93,6 +93,7 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { bool subscribed; // The number of times we've attempted reconstructing this task so far. int reconstruction_attempt; + bool return_values_lost; // The task's reconstruction timer. If this expires before a lease // notification is received, then the task will be reconstructed. std::unique_ptr reconstruction_timer; @@ -115,7 +116,7 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { /// reconstructions of the same task (e.g., if a task creates two objects /// that both require reconstruction). void AttemptReconstruction(const TaskID &task_id, const ObjectID &required_object_id, - int reconstruction_attempt); + int reconstruction_attempt, bool created); /// Handle expiration of a task lease. void HandleTaskLeaseExpired(const TaskID &task_id); @@ -127,7 +128,7 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { /// The event loop. boost::asio::io_service &io_service_; /// The handler to call for tasks that require reconstruction. - const std::function reconstruction_handler_; + const std::function reconstruction_handler_; /// The initial timeout within which a task lease notification must be /// received. Otherwise, reconstruction will be triggered. const int64_t initial_reconstruction_timeout_ms_; diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index 9f1499c31664..ba1e4c23c190 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -19,7 +19,7 @@ class MockObjectDirectory : public ObjectDirectoryInterface { MockObjectDirectory() {} ray::Status LookupLocations(const ObjectID &object_id, - const OnLocationsFound &callback) { + const OnLocationsFound &callback) override { callbacks_.push_back({object_id, callback}); return ray::Status::OK(); } @@ -27,16 +27,28 @@ class MockObjectDirectory : public ObjectDirectoryInterface { void FlushCallbacks() { for (const auto &callback : callbacks_) { const ObjectID object_id = callback.first; - callback.second(locations_[object_id], object_id); + auto it = locations_.find(object_id); + if (it == locations_.end()) { + callback.second(object_id, {}, /*created=*/false); + } else { + callback.second(object_id, it->second, /*created=*/true); + } } callbacks_.clear(); } - void SetObjectLocations(const ObjectID &object_id, std::vector locations) { + void SetObjectLocations(const ObjectID &object_id, + const std::unordered_set &locations) { locations_[object_id] = locations; } - std::string DebugString() const { return ""; } + void HandleClientRemoved(const ClientID &client_id) override { + for (auto &locations : locations_) { + locations.second.erase(client_id); + } + } + + std::string DebugString() const override { return ""; } MOCK_METHOD0(RegisterBackend, void(void)); MOCK_METHOD0(GetLocalClientID, ray::ClientID()); @@ -54,7 +66,7 @@ class MockObjectDirectory : public ObjectDirectoryInterface { private: std::vector> callbacks_; - std::unordered_map> locations_; + std::unordered_map> locations_; }; class MockGcs : public gcs::PubsubInterface, @@ -138,8 +150,8 @@ class ReconstructionPolicyTest : public ::testing::Test { mock_object_directory_(std::make_shared()), reconstruction_timeout_ms_(50), reconstruction_policy_(std::make_shared( - io_service_, - [this](const TaskID &task_id) { TriggerReconstruction(task_id); }, + io_service_, [this](const TaskID &task_id, + bool created) { TriggerReconstruction(task_id); }, reconstruction_timeout_ms_, ClientID::from_random(), mock_gcs_, mock_object_directory_, mock_gcs_)), timer_canceled_(false) { @@ -250,6 +262,30 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionEvicted) { ASSERT_EQ(reconstructed_tasks_[task_id], 1); } +TEST_F(ReconstructionPolicyTest, TestReconstructionObjectLost) { + TaskID task_id = TaskID::from_random(); + task_id = FinishTaskId(task_id); + ObjectID object_id = ComputeReturnId(task_id, 1); + ClientID client_id = ClientID::from_random(); + mock_object_directory_->SetObjectLocations(object_id, {client_id}); + + // Listen for both objects. + reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + // Run the test for longer than the reconstruction timeout. + Run(reconstruction_timeout_ms_ * 1.1); + // Check that reconstruction was not triggered, since the objects still + // exist on a live node. + ASSERT_EQ(reconstructed_tasks_[task_id], 0); + + // Simulate evicting one of the objects. + mock_object_directory_->HandleClientRemoved(client_id); + // Run the test again. + Run(reconstruction_timeout_ms_ * 1.1); + // Check that reconstruction was triggered, since one of the objects was + // evicted. + ASSERT_EQ(reconstructed_tasks_[task_id], 1); +} + TEST_F(ReconstructionPolicyTest, TestDuplicateReconstruction) { // Create two object IDs produced by the same task. TaskID task_id = TaskID::from_random(); diff --git a/test/actor_test.py b/test/actor_test.py index b65ba6426156..9a42842d68ed 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -21,7 +21,12 @@ @pytest.fixture def ray_start_regular(): # Start the Ray processes. - ray.init(num_cpus=1) + ray.init( + num_cpus=1, + _internal_config=json.dumps({ + "initial_reconstruction_timeout_milliseconds": 200, + "num_heartbeats_timeout": 10, + })) yield None # The code after the yield will run as teardown code. ray.shutdown() @@ -2094,6 +2099,48 @@ def method(self): ray.get(results) +def test_actor_eviction(shutdown_only): + @ray.remote + class Actor(object): + def __init__(self): + pass + + def create_object(self, size): + return np.random.rand(size) + + object_store_memory = 10**8 + ray.init( + object_store_memory=object_store_memory, + _internal_config=json.dumps({ + "initial_reconstruction_timeout_milliseconds": 200 + })) + + a = Actor.remote() + # Submit enough methods on the actor so that they exceed the size of the + # object store. + objects = [] + num_objects = 20 + for _ in range(num_objects): + obj = a.create_object.remote(object_store_memory // num_objects) + objects.append(obj) + # Get each object once to make sure each object gets created. + ray.get(obj) + + # Get each object again. At this point, the earlier objects should have + # been evicted. + num_evicted, num_success = 0, 0 + for obj in objects: + try: + ray.get(obj) + num_success += 1 + except ray.worker.RayTaskError: + num_evicted += 1 + # Some objects should have been evicted, and some should still be in the + # object store. + assert num_evicted > 0 + assert num_success > 0 + + def test_actor_reconstruction(ray_start_regular): """Test actor reconstruction when actor process is killed."""