diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index b101e374430d..06a7be587a53 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -215,6 +215,21 @@ void LineageCache::AddReadyTask(const Task &task) { } } +uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id) const { + if (subscribed_tasks_.count(task_id) == 1) { + return 0; + } + auto entry = lineage_.GetEntry(task_id); + if (!entry) { + return 0; + } + uint64_t cnt = 1; + for (const auto &parent_id : entry->GetParentTaskIds()) { + cnt += CountUnsubscribedLineage(parent_id); + } + return cnt; +} + void LineageCache::RemoveWaitingTask(const TaskID &task_id) { auto entry = lineage_.PopEntry(task_id); // It's only okay to remove a task that is waiting for execution. @@ -226,24 +241,18 @@ void LineageCache::RemoveWaitingTask(const TaskID &task_id) { entry->ResetStatus(GcsStatus_UNCOMMITTED_REMOTE); RAY_CHECK(lineage_.SetEntry(std::move(*entry))); - // Try to evict a task and its uncommitted lineage if the uncommitted lineage - // exceeds the maximum size. + // Request a notification for every max_lineage_size_ tasks, + // so that the task and its uncommitted lineage can be evicted + // once the commit notification is received. + // By doing this, we make sure that the unevicted lineage won't be more than + // max_lineage_size_, and the number of subscribed tasks won't be more than + // N / max_lineage_size_, where N is the size of the task chain. // NOTE(swang): The number of entries in the uncommitted lineage also // includes local tasks that haven't been committed yet, not just remote // tasks, so this is an overestimate. - const auto uncommitted_lineage = GetUncommittedLineage(task_id); - if (uncommitted_lineage.GetEntries().size() > max_lineage_size_) { - // Request a notification for the newly remote task so that the task and - // its uncommitted lineage can be evicted once the commit notification is - // received. Since this task was in state WAITING, check that we were not + if (CountUnsubscribedLineage(task_id) > max_lineage_size_) { + // Since this task was in state WAITING, check that we were not // already subscribed to the task. - // NOTE(swang): We may end up requesting notifications for too many tasks - // from the GCS if we do not receive a notification for this task fast - // enough, since every dependent and waiting task that gets removed - // afterwards will also have an uncommitted lineage that's too large. If - // this becomes an issue, we can be smarter about which tasks to request by - // either storing the dependency depth as part of the task specs, or - // storing that information as a data structure in the lineage cache. RAY_CHECK(SubscribeTask(task_id)); } } diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index b3e9cbe96ae7..588fe76d5b92 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -220,6 +220,8 @@ class LineageCache { /// Unsubscribe from notifications for a task. Returns whether the operation /// was successful (whether we were subscribed). bool UnsubscribeTask(const UniqueID &task_id); + /// Count the size of unsubscribed and uncommitted lineage + uint64_t CountUnsubscribedLineage(const UniqueID &task_id) const; /// The client ID, used to request notifications for specific tasks. /// TODO(swang): Move the ClientID into the generic Table implementation. diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 74c2e6bce0d9..de26467bf690 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -52,6 +52,7 @@ class MockGcs : public gcs::TableInterface, if (task_table_.count(task_id) == 1) { callbacks_.push_back({notification_callback_, task_id}); } + num_requested_notifications_ += 1; return ray::Status::OK(); } @@ -75,11 +76,14 @@ class MockGcs : public gcs::TableInterface, const std::unordered_set &SubscribedTasks() const { return subscribed_tasks_; } + const int NumRequestedNotifications() const { return num_requested_notifications_; } + private: std::unordered_map> task_table_; std::vector> callbacks_; gcs::raylet::TaskTable::WriteCallback notification_callback_; std::unordered_set subscribed_tasks_; + int num_requested_notifications_ = 0; }; class LineageCacheTest : public ::testing::Test { @@ -391,14 +395,14 @@ TEST_F(LineageCacheTest, TestEviction) { // All tasks have now been flushed. Check that enough lineage has been // evicted that the uncommitted lineage is now less than the maximum size. uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id); - ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_); + ASSERT_TRUE(uncommitted_lineage.GetEntries().size() <= max_lineage_size_); } TEST_F(LineageCacheTest, TestOutOfOrderEviction) { // Insert a chain of dependent tasks that is more than twice as long as the // maximum lineage size. This will ensure that we request notifications for - // at least 2 remote tasks. - uint64_t lineage_size = (2 * max_lineage_size_) + 1; + // at most 2 remote tasks. + uint64_t lineage_size = (2 * max_lineage_size_) + 2; size_t num_tasks_flushed = 0; std::vector tasks; InsertTaskChain(lineage_cache_, tasks, lineage_size, std::vector(), 1); @@ -408,6 +412,8 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) { auto task_id = task.GetTaskSpecification().TaskId(); lineage_cache_.RemoveWaitingTask(task_id); } + // Check that we requested at most 2 notifications + ASSERT_TRUE(mock_gcs_.NumRequestedNotifications() <= 2); // Check that the last task in the chain still has all tasks in its // uncommitted lineage. @@ -441,7 +447,7 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) { // All tasks have now been flushed. Check that enough lineage has been // evicted that the uncommitted lineage is now less than the maximum size. uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id); - ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_); + ASSERT_TRUE(uncommitted_lineage.GetEntries().size() <= max_lineage_size_); } } // namespace raylet