diff --git a/src/common/state/ray_config.h b/src/common/state/ray_config.h index 957dcf96f239..c7668c35c66c 100644 --- a/src/common/state/ray_config.h +++ b/src/common/state/ray_config.h @@ -20,6 +20,8 @@ class RayConfig { int64_t get_timeout_milliseconds() const { return get_timeout_milliseconds_; } + uint64_t max_lineage_size() const { return max_lineage_size_; } + int64_t worker_get_request_size() const { return worker_get_request_size_; } int64_t worker_fetch_request_size() const { @@ -112,6 +114,7 @@ class RayConfig { get_timeout_milliseconds_(1000), worker_get_request_size_(10000), worker_fetch_request_size_(10000), + max_lineage_size_(100), actor_max_dummy_objects_(1000), num_connect_attempts_(50), connect_timeout_milliseconds_(100), @@ -160,6 +163,11 @@ class RayConfig { int64_t worker_get_request_size_; int64_t worker_fetch_request_size_; + /// This is used to bound the size of the Raylet's lineage cache. This is + /// the maximum uncommitted lineage size that any remote task in the cache + /// can have before eviction will be attempted. + uint64_t max_lineage_size_; + /// This is a temporary constant used by actors to determine how many dummy /// objects to store. int64_t actor_max_dummy_objects_; diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index a5eb91a6b3b0..b101e374430d 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -123,8 +123,12 @@ flatbuffers::Offset Lineage::ToFlatbuffer( LineageCache::LineageCache(const ClientID &client_id, gcs::TableInterface &task_storage, - gcs::PubsubInterface &task_pubsub) - : client_id_(client_id), task_storage_(task_storage), task_pubsub_(task_pubsub) {} + gcs::PubsubInterface &task_pubsub, + uint64_t max_lineage_size) + : client_id_(client_id), + task_storage_(task_storage), + task_pubsub_(task_pubsub), + max_lineage_size_(max_lineage_size) {} /// A helper function to merge one lineage into another, in DFS order. /// @@ -178,6 +182,14 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l return false; }); + // If the task was previously remote, then we may have been subscribed to + // it. Unsubscribe since we are now responsible for committing the task. + auto entry = lineage_.GetEntry(task_id); + if (entry) { + RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE); + UnsubscribeTask(task_id); + } + // Add the submitted task to the lineage cache as UNCOMMITTED_WAITING. It // should be marked as UNCOMMITTED_READY once the task starts execution. LineageEntry task_entry(task, GcsStatus_UNCOMMITTED_WAITING); @@ -185,9 +197,15 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l } void LineageCache::AddReadyTask(const Task &task) { + const TaskID task_id = task.GetTaskSpecification().TaskId(); + + // Tasks can only become READY if they were in WAITING. + auto entry = lineage_.GetEntry(task_id); + RAY_CHECK(entry); + RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_WAITING); + auto new_entry = LineageEntry(task, GcsStatus_UNCOMMITTED_READY); RAY_CHECK(lineage_.SetEntry(std::move(new_entry))); - const TaskID task_id = task.GetTaskSpecification().TaskId(); // Attempt to flush the task. bool flushed = FlushTask(task_id); if (!flushed) { @@ -207,6 +225,27 @@ void LineageCache::RemoveWaitingTask(const TaskID &task_id) { // one. entry->ResetStatus(GcsStatus_UNCOMMITTED_REMOTE); RAY_CHECK(lineage_.SetEntry(std::move(*entry))); + + // Try to evict a task and its uncommitted lineage if the uncommitted lineage + // exceeds the maximum size. + // NOTE(swang): The number of entries in the uncommitted lineage also + // includes local tasks that haven't been committed yet, not just remote + // tasks, so this is an overestimate. + const auto uncommitted_lineage = GetUncommittedLineage(task_id); + if (uncommitted_lineage.GetEntries().size() > max_lineage_size_) { + // Request a notification for the newly remote task so that the task and + // its uncommitted lineage can be evicted once the commit notification is + // received. Since this task was in state WAITING, check that we were not + // already subscribed to the task. + // NOTE(swang): We may end up requesting notifications for too many tasks + // from the GCS if we do not receive a notification for this task fast + // enough, since every dependent and waiting task that gets removed + // afterwards will also have an uncommitted lineage that's too large. If + // this becomes an issue, we can be smarter about which tasks to request by + // either storing the dependency depth as part of the task specs, or + // storing that information as a data structure in the lineage cache. + RAY_CHECK(SubscribeTask(task_id)); + } } Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id) const { @@ -216,7 +255,7 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id) const { MergeLineageHelper(task_id, lineage_, uncommitted_lineage, [](GcsStatus status) { // The stopping condition for recursion is that the entry has been // committed to the GCS. - return status == GcsStatus_COMMITTED; + return false; }); return uncommitted_lineage; } @@ -234,7 +273,7 @@ bool LineageCache::FlushTask(const TaskID &task_id) { // If a parent entry exists in the lineage cache but has not been // committed yet, then as far as we know, it's still in flight to the // GCS. Skip this task for now. - if (parent && parent->GetStatus() != GcsStatus_COMMITTED) { + if (parent) { RAY_CHECK(parent->GetStatus() != GcsStatus_UNCOMMITTED_WAITING) << "Children should not become ready to flush before their parents."; // Request notifications about the parent entry's commit in the GCS if @@ -243,13 +282,7 @@ bool LineageCache::FlushTask(const TaskID &task_id) { // notification about the task's commit via HandleEntryCommitted, then // this task will be ready to write on the next call to Flush(). if (parent->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE) { - auto inserted = subscribed_tasks_.insert(parent_id); - if (inserted.second) { - // Only request notifications about the parent entry if we haven't - // already requested notifications for it. - RAY_CHECK_OK( - task_pubsub_.RequestNotifications(JobID::nil(), parent_id, client_id_)); - } + SubscribeTask(parent_id); } all_arguments_committed = false; // Track the fact that this task is dependent on a parent that hasn't yet @@ -297,42 +330,79 @@ void LineageCache::Flush() { } } -void PopAncestorTasks(const UniqueID &task_id, Lineage &lineage) { - auto entry = lineage.PopEntry(task_id); +bool LineageCache::SubscribeTask(const UniqueID &task_id) { + auto inserted = subscribed_tasks_.insert(task_id); + bool unsubscribed = inserted.second; + if (unsubscribed) { + // Request notifications for the task if we haven't already requested + // notifications for it. + RAY_CHECK_OK(task_pubsub_.RequestNotifications(JobID::nil(), task_id, client_id_)); + } + // Return whether we were previously unsubscribed to this task and are now + // subscribed. + return unsubscribed; +} + +bool LineageCache::UnsubscribeTask(const UniqueID &task_id) { + auto it = subscribed_tasks_.find(task_id); + bool subscribed = (it != subscribed_tasks_.end()); + if (subscribed) { + // Cancel notifications for the task if we previously requested + // notifications for it. + RAY_CHECK_OK(task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_)); + subscribed_tasks_.erase(it); + } + // Return whether we were previously subscribed to this task and are now + // unsubscribed. + return subscribed; +} + +void LineageCache::EvictRemoteLineage(const UniqueID &task_id) { + // Remove the ancestor task. + auto entry = lineage_.PopEntry(task_id); if (!entry) { return; } + // Tasks are committed in data dependency order per node, so the only + // ancestors of a committed task should be other remote tasks. auto status = entry->GetStatus(); - RAY_CHECK(status == GcsStatus_UNCOMMITTED_REMOTE || status == GcsStatus_COMMITTED); + RAY_CHECK(status == GcsStatus_UNCOMMITTED_REMOTE); + // We are evicting the remote ancestors of a task, so there should not be + // any dependent tasks that need to be flushed. + RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0); + // Unsubscribe from the remote ancestor task if we were subscribed to + // notifications. + UnsubscribeTask(task_id); + // Recurse and remove this task's ancestors. for (const auto &parent_id : entry->GetParentTaskIds()) { - PopAncestorTasks(parent_id, lineage); + EvictRemoteLineage(parent_id); } } void LineageCache::HandleEntryCommitted(const UniqueID &task_id) { RAY_LOG(DEBUG) << "task committed: " << task_id; auto entry = lineage_.PopEntry(task_id); - RAY_CHECK(entry); - for (const auto &parent_id : entry->GetParentTaskIds()) { - PopAncestorTasks(parent_id, lineage_); + if (!entry) { + // The committed entry has already been evicted. Check that the committed + // entry does not have any dependent tasks, since we should've already + // attempted to flush these tasks on the first commit notification. + RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0); + // Check that we already unsubscribed from the task when handling the + // first commit notification. + RAY_CHECK(subscribed_tasks_.count(task_id) == 0); + // Do nothing if the committed entry has already been evicted. + return; } - // Mark this task as COMMITTED. Any tasks that were dependent on it and are - // ready to be written may now be flushed to the GCS. - bool committed = entry->SetStatus(GcsStatus_COMMITTED); - if (!committed) { - // If we failed to mark the task as committed, check that it's because it - // was committed before. This means that we already received a notification - // about the commit. - RAY_CHECK(entry->GetStatus() == GcsStatus_COMMITTED); + + // Evict the committed task's uncommitted lineage. Since local tasks are + // written in data dependency order, the uncommitted lineage should only + // include remote tasks, i.e. tasks that were committed by a different node. + for (const auto &parent_id : entry->GetParentTaskIds()) { + EvictRemoteLineage(parent_id); } - RAY_CHECK(lineage_.SetEntry(std::move(*entry))); // Stop listening for notifications about this task. - auto it = subscribed_tasks_.find(task_id); - if (it != subscribed_tasks_.end()) { - RAY_CHECK_OK(task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_)); - subscribed_tasks_.erase(it); - } + UnsubscribeTask(task_id); // Try to flush the children of the committed task. These are the tasks that // have a dependency on the committed task. diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index c4ccf92c289c..b3e9cbe96ae7 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -29,9 +29,6 @@ enum GcsStatus { /// The task has been written to the GCS and we are waiting for an /// acknowledgement of the commit. GcsStatus_COMMITTING, - /// The task has been committed in the GCS. It's safe to remove this entry - /// from the lineage cache. - GcsStatus_COMMITTED, }; /// \class LineageEntry @@ -164,7 +161,7 @@ class LineageCache { /// TODO(swang): Pass in the policy (interface?). LineageCache(const ClientID &client_id, gcs::TableInterface &task_storage, - gcs::PubsubInterface &task_pubsub); + gcs::PubsubInterface &task_pubsub, uint64_t max_lineage_size); /// Add a task that is waiting for execution and its uncommitted lineage. /// These entries will not be written to the GCS until set to ready. @@ -214,6 +211,15 @@ class LineageCache { /// parents that are not committed yet, then the child will be flushed once /// the parents have been committed. bool FlushTask(const TaskID &task_id); + /// Evict a remote task and its lineage. This should only be called if we + /// are sure that the remote task and its lineage are committed. + void EvictRemoteLineage(const UniqueID &task_id); + /// Subscribe to notifications for a task. Returns whether the operation + /// was successful (whether we were not already subscribed). + bool SubscribeTask(const UniqueID &task_id); + /// Unsubscribe from notifications for a task. Returns whether the operation + /// was successful (whether we were subscribed). + bool UnsubscribeTask(const UniqueID &task_id); /// The client ID, used to request notifications for specific tasks. /// TODO(swang): Move the ClientID into the generic Table implementation. @@ -223,6 +229,11 @@ class LineageCache { /// The pubsub storage system for task information. This can be used to /// request notifications for the commit of a task entry. gcs::PubsubInterface &task_pubsub_; + /// The maximum size that a remote task's uncommitted lineage can get to. If + /// a remote task's uncommitted lineage exceeds this size, then a + /// notification will be requested from the pubsub storage system so that + /// the task and its lineage can be evicted from the stash. + uint64_t max_lineage_size_; /// The set of tasks that are in UNCOMMITTED_READY state. This is a cache of /// the tasks that may be flushable. // TODO(swang): As an optimization, we may also want to further distinguish diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 12b24ba35263..74c2e6bce0d9 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -85,7 +85,9 @@ class MockGcs : public gcs::TableInterface, class LineageCacheTest : public ::testing::Test { public: LineageCacheTest() - : mock_gcs_(), lineage_cache_(ClientID::from_random(), mock_gcs_, mock_gcs_) { + : max_lineage_size_(10), + mock_gcs_(), + lineage_cache_(ClientID::from_random(), mock_gcs_, mock_gcs_, max_lineage_size_) { mock_gcs_.Subscribe([this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id, const ray::protocol::TaskT &data) { lineage_cache_.HandleEntryCommitted(task_id); @@ -93,6 +95,7 @@ class LineageCacheTest : public ::testing::Test { } protected: + uint64_t max_lineage_size_; MockGcs mock_gcs_; LineageCache lineage_cache_; }; @@ -277,26 +280,28 @@ TEST_F(LineageCacheTest, TestWritebackPartiallyReady) { ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); } -TEST_F(LineageCacheTest, TestForwardTaskRoundTrip) { +TEST_F(LineageCacheTest, TestForwardTasksRoundTrip) { // Insert a chain of dependent tasks. + uint64_t lineage_size = max_lineage_size_ + 1; std::vector tasks; auto return_values1 = - InsertTaskChain(lineage_cache_, tasks, 3, std::vector(), 1); - - // Simulate removing the task and forwarding it to another node. - auto forwarded_task = tasks[1]; - auto task_id_to_remove = forwarded_task.GetTaskSpecification().TaskId(); - auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id_to_remove); - lineage_cache_.RemoveWaitingTask(task_id_to_remove); - - // Simulate receiving the task again. - flatbuffers::FlatBufferBuilder fbb; - auto uncommitted_lineage_message = - uncommitted_lineage.ToFlatbuffer(fbb, task_id_to_remove); - fbb.Finish(uncommitted_lineage_message); - uncommitted_lineage = Lineage( - *flatbuffers::GetRoot(fbb.GetBufferPointer())); - lineage_cache_.AddWaitingTask(forwarded_task, uncommitted_lineage); + InsertTaskChain(lineage_cache_, tasks, lineage_size, std::vector(), 1); + + // Simulate removing each task, forwarding it to another node, then + // receiving the task back again. + for (auto it = tasks.begin(); it != tasks.end(); it++) { + const auto task_id = it->GetTaskSpecification().TaskId(); + // Simulate removing the task and forwarding it to another node. + auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id); + lineage_cache_.RemoveWaitingTask(task_id); + // Simulate receiving the task again. Make sure we can add the task back. + flatbuffers::FlatBufferBuilder fbb; + auto uncommitted_lineage_message = uncommitted_lineage.ToFlatbuffer(fbb, task_id); + fbb.Finish(uncommitted_lineage_message); + uncommitted_lineage = Lineage( + *flatbuffers::GetRoot(fbb.GetBufferPointer())); + lineage_cache_.AddWaitingTask(*it, uncommitted_lineage); + } } TEST_F(LineageCacheTest, TestForwardTask) { @@ -341,6 +346,104 @@ TEST_F(LineageCacheTest, TestForwardTask) { ASSERT_EQ(mock_gcs_.SubscribedTasks().size(), 0); } +TEST_F(LineageCacheTest, TestEviction) { + // Insert a chain of dependent tasks. + uint64_t lineage_size = max_lineage_size_ + 1; + size_t num_tasks_flushed = 0; + std::vector tasks; + InsertTaskChain(lineage_cache_, tasks, lineage_size, std::vector(), 1); + + // Simulate forwarding the chain of tasks to a remote node. + for (const auto &task : tasks) { + auto task_id = task.GetTaskSpecification().TaskId(); + lineage_cache_.RemoveWaitingTask(task_id); + } + + // Check that the last task in the chain still has all tasks in its + // uncommitted lineage. + const auto last_task_id = tasks.back().GetTaskSpecification().TaskId(); + auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id); + ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); + + // Simulate executing the first task on a remote node and adding it to the + // GCS. + auto task_data = std::make_shared(); + auto it = tasks.begin(); + RAY_CHECK_OK(mock_gcs_.RemoteAdd(it->GetTaskSpecification().TaskId(), task_data)); + it++; + // Check that the remote task is flushed. + num_tasks_flushed++; + mock_gcs_.Flush(); + CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + // Check that the last task in the chain still has all tasks in its + // uncommitted lineage. + ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); + + // Simulate executing the rest of the tasks on a remote node and adding them + // to the GCS. + for (; it != tasks.end(); it++) { + RAY_CHECK_OK(mock_gcs_.RemoteAdd(it->GetTaskSpecification().TaskId(), task_data)); + // Check that the remote task is flushed. + num_tasks_flushed++; + mock_gcs_.Flush(); + CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + } + // All tasks have now been flushed. Check that enough lineage has been + // evicted that the uncommitted lineage is now less than the maximum size. + uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id); + ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_); +} + +TEST_F(LineageCacheTest, TestOutOfOrderEviction) { + // Insert a chain of dependent tasks that is more than twice as long as the + // maximum lineage size. This will ensure that we request notifications for + // at least 2 remote tasks. + uint64_t lineage_size = (2 * max_lineage_size_) + 1; + size_t num_tasks_flushed = 0; + std::vector tasks; + InsertTaskChain(lineage_cache_, tasks, lineage_size, std::vector(), 1); + + // Simulate forwarding the chain of tasks to a remote node. + for (const auto &task : tasks) { + auto task_id = task.GetTaskSpecification().TaskId(); + lineage_cache_.RemoveWaitingTask(task_id); + } + + // Check that the last task in the chain still has all tasks in its + // uncommitted lineage. + const auto last_task_id = tasks.back().GetTaskSpecification().TaskId(); + auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id); + ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); + + // Simulate executing the tasks at the remote node and receiving the + // notifications from the GCS in reverse order of execution. + auto task_data = std::make_shared(); + auto it = tasks.rbegin(); + RAY_CHECK_OK(mock_gcs_.RemoteAdd(it->GetTaskSpecification().TaskId(), task_data)); + it++; + // Check that the remote task is flushed. + num_tasks_flushed++; + mock_gcs_.Flush(); + CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + // Check that the last task in the chain still has all tasks in its + // uncommitted lineage. + ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); + + // Simulate executing the rest of the tasks on a remote node and receiving + // the notifications from the GCS in reverse order of execution. + for (; it != tasks.rend(); it++) { + RAY_CHECK_OK(mock_gcs_.RemoteAdd(it->GetTaskSpecification().TaskId(), task_data)); + // Check that the remote task is flushed. + num_tasks_flushed++; + mock_gcs_.Flush(); + CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + } + // All tasks have now been flushed. Check that enough lineage has been + // evicted that the uncommitted lineage is now less than the maximum size. + uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id); + ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_); +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 68bd98480ec7..d50c2683baf0 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -45,6 +45,7 @@ int main(int argc, char *argv[]) { node_manager_config.heartbeat_period_ms = RayConfig::instance().heartbeat_timeout_milliseconds(); + node_manager_config.max_lineage_size = RayConfig::instance().max_lineage_size(); // Configuration for the object manager. ray::ObjectManagerConfig object_manager_config; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index d63adb773fea..6000d6ee3c2b 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -76,7 +76,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, reconstruction_policy_([this](const TaskID &task_id) { ResubmitTask(task_id); }), task_dependency_manager_(object_manager), lineage_cache_(gcs_client_->client_table().GetLocalClientId(), - gcs_client->raylet_task_table(), gcs_client->raylet_task_table()), + gcs_client->raylet_task_table(), gcs_client->raylet_task_table(), + config.max_lineage_size), remote_clients_(), remote_server_connections_(), actor_registry_() { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index fd7cbfdab30f..fcd48277f9d4 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -24,6 +24,7 @@ struct NodeManagerConfig { int num_initial_workers; std::vector worker_command; uint64_t heartbeat_period_ms; + uint64_t max_lineage_size; }; class NodeManager {