From 1cee4295aa808374148139a8015fae1f192a1cf7 Mon Sep 17 00:00:00 2001 From: jinjiang Date: Mon, 4 Jun 2018 14:49:44 +0800 Subject: [PATCH 1/8] Java in vscode. --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index fa9fe8f69d2b..2cfbadecddf6 100644 --- a/.gitignore +++ b/.gitignore @@ -147,3 +147,7 @@ build # Vscode .vscode/ +java/**/.settings +java/**/.classpath +java/**/.project +java/**/target \ No newline at end of file From 51b24b1486716e30c6d60bf12309e4f47e38381f Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Tue, 5 Jun 2018 21:30:16 +0800 Subject: [PATCH 2/8] Optimize lineage eviction --- src/ray/raylet/lineage_cache.cc | 37 +++++++++++++++++----------- src/ray/raylet/lineage_cache.h | 2 ++ src/ray/raylet/lineage_cache_test.cc | 8 +++++- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index b101e374430d..7a516dc9e61d 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -215,6 +215,21 @@ void LineageCache::AddReadyTask(const Task &task) { } } +uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id) { + if (subscribed_tasks_.count(task_id) == 1) { + return 0; + } + auto entry = lineage_.GetEntry(task_id); + uint64_t cnt = 0; + if (!entry) { + return 0; + } + for (const auto& parent_id : entry->GetParentTaskIds()) { + cnt += CountUnsubscribedLineage(parent_id); + } + return cnt; +} + void LineageCache::RemoveWaitingTask(const TaskID &task_id) { auto entry = lineage_.PopEntry(task_id); // It's only okay to remove a task that is waiting for execution. @@ -226,24 +241,18 @@ void LineageCache::RemoveWaitingTask(const TaskID &task_id) { entry->ResetStatus(GcsStatus_UNCOMMITTED_REMOTE); RAY_CHECK(lineage_.SetEntry(std::move(*entry))); - // Try to evict a task and its uncommitted lineage if the uncommitted lineage - // exceeds the maximum size. + // Request a notification for every one out of max_lineage_size_ tasks, + // so that the task and its uncommitted lineage can be evicted + // once the commit notification is received. + // By doing this, we make sure that the unevicted lineage won't be more than + // max_lineage_size_, and the number of subscribed tasks won't be more than + // N / max_lineage_size_, where N is the size of the task chain. // NOTE(swang): The number of entries in the uncommitted lineage also // includes local tasks that haven't been committed yet, not just remote // tasks, so this is an overestimate. - const auto uncommitted_lineage = GetUncommittedLineage(task_id); - if (uncommitted_lineage.GetEntries().size() > max_lineage_size_) { - // Request a notification for the newly remote task so that the task and - // its uncommitted lineage can be evicted once the commit notification is - // received. Since this task was in state WAITING, check that we were not + if (CountUnsubscribedLineage(task_id) > max_lineage_size_) { + // Since this task was in state WAITING, check that we were not // already subscribed to the task. - // NOTE(swang): We may end up requesting notifications for too many tasks - // from the GCS if we do not receive a notification for this task fast - // enough, since every dependent and waiting task that gets removed - // afterwards will also have an uncommitted lineage that's too large. If - // this becomes an issue, we can be smarter about which tasks to request by - // either storing the dependency depth as part of the task specs, or - // storing that information as a data structure in the lineage cache. RAY_CHECK(SubscribeTask(task_id)); } } diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index b3e9cbe96ae7..c24ee90217ac 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -220,6 +220,8 @@ class LineageCache { /// Unsubscribe from notifications for a task. Returns whether the operation /// was successful (whether we were subscribed). bool UnsubscribeTask(const UniqueID &task_id); + /// Count the size of unsubscribed and uncommitted lineage + uint64_t CountUnsubscribedLineage(const UniqueID &task_id); /// The client ID, used to request notifications for specific tasks. /// TODO(swang): Move the ClientID into the generic Table implementation. diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 74c2e6bce0d9..74ace6b7f7df 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -52,6 +52,7 @@ class MockGcs : public gcs::TableInterface, if (task_table_.count(task_id) == 1) { callbacks_.push_back({notification_callback_, task_id}); } + num_requested_notifications_ += 1; return ray::Status::OK(); } @@ -75,11 +76,14 @@ class MockGcs : public gcs::TableInterface, const std::unordered_set &SubscribedTasks() const { return subscribed_tasks_; } + const int NumRequestedNotifications() const { return num_requested_notifications_; } + private: std::unordered_map> task_table_; std::vector> callbacks_; gcs::raylet::TaskTable::WriteCallback notification_callback_; std::unordered_set subscribed_tasks_; + int num_requested_notifications_; }; class LineageCacheTest : public ::testing::Test { @@ -397,7 +401,7 @@ TEST_F(LineageCacheTest, TestEviction) { TEST_F(LineageCacheTest, TestOutOfOrderEviction) { // Insert a chain of dependent tasks that is more than twice as long as the // maximum lineage size. This will ensure that we request notifications for - // at least 2 remote tasks. + // at most 2 remote tasks. uint64_t lineage_size = (2 * max_lineage_size_) + 1; size_t num_tasks_flushed = 0; std::vector tasks; @@ -408,6 +412,8 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) { auto task_id = task.GetTaskSpecification().TaskId(); lineage_cache_.RemoveWaitingTask(task_id); } + // Check that we requested at most 2 notifications + ASSERT_TRUE(mock_gcs_.NumRequestedNotifications() <= 2); // Check that the last task in the chain still has all tasks in its // uncommitted lineage. From 898eae7752f9a2553b43552a4c58005b26700600 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Tue, 5 Jun 2018 22:06:43 +0800 Subject: [PATCH 3/8] minor fix --- src/ray/raylet/lineage_cache.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 7a516dc9e61d..f74932a15b65 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -220,10 +220,10 @@ uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id) { return 0; } auto entry = lineage_.GetEntry(task_id); - uint64_t cnt = 0; if (!entry) { return 0; } + uint64_t cnt = 0; for (const auto& parent_id : entry->GetParentTaskIds()) { cnt += CountUnsubscribedLineage(parent_id); } @@ -241,7 +241,7 @@ void LineageCache::RemoveWaitingTask(const TaskID &task_id) { entry->ResetStatus(GcsStatus_UNCOMMITTED_REMOTE); RAY_CHECK(lineage_.SetEntry(std::move(*entry))); - // Request a notification for every one out of max_lineage_size_ tasks, + // Request a notification for every max_lineage_size_ tasks, // so that the task and its uncommitted lineage can be evicted // once the commit notification is received. // By doing this, we make sure that the unevicted lineage won't be more than From 26879cdf7f3a996e624ccd4c39eb66f08c9ac36e Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Wed, 6 Jun 2018 12:10:17 +0800 Subject: [PATCH 4/8] fix ut --- src/ray/raylet/lineage_cache.cc | 2 +- src/ray/raylet/lineage_cache_test.cc | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index f74932a15b65..cab8ed60e9e8 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -223,7 +223,7 @@ uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id) { if (!entry) { return 0; } - uint64_t cnt = 0; + uint64_t cnt = 1; for (const auto& parent_id : entry->GetParentTaskIds()) { cnt += CountUnsubscribedLineage(parent_id); } diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 74ace6b7f7df..de26467bf690 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -83,7 +83,7 @@ class MockGcs : public gcs::TableInterface, std::vector> callbacks_; gcs::raylet::TaskTable::WriteCallback notification_callback_; std::unordered_set subscribed_tasks_; - int num_requested_notifications_; + int num_requested_notifications_ = 0; }; class LineageCacheTest : public ::testing::Test { @@ -395,14 +395,14 @@ TEST_F(LineageCacheTest, TestEviction) { // All tasks have now been flushed. Check that enough lineage has been // evicted that the uncommitted lineage is now less than the maximum size. uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id); - ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_); + ASSERT_TRUE(uncommitted_lineage.GetEntries().size() <= max_lineage_size_); } TEST_F(LineageCacheTest, TestOutOfOrderEviction) { // Insert a chain of dependent tasks that is more than twice as long as the // maximum lineage size. This will ensure that we request notifications for // at most 2 remote tasks. - uint64_t lineage_size = (2 * max_lineage_size_) + 1; + uint64_t lineage_size = (2 * max_lineage_size_) + 2; size_t num_tasks_flushed = 0; std::vector tasks; InsertTaskChain(lineage_cache_, tasks, lineage_size, std::vector(), 1); @@ -447,7 +447,7 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) { // All tasks have now been flushed. Check that enough lineage has been // evicted that the uncommitted lineage is now less than the maximum size. uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id); - ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_); + ASSERT_TRUE(uncommitted_lineage.GetEntries().size() <= max_lineage_size_); } } // namespace raylet From 9b5c52467962d82069ac123e6a069c10e7c17994 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 7 Jun 2018 10:24:20 +0800 Subject: [PATCH 5/8] fix comment and lint --- src/ray/raylet/lineage_cache.cc | 26 +++++++++++++------------- src/ray/raylet/lineage_cache.h | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index cab8ed60e9e8..cfa9d23924f5 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -215,19 +215,19 @@ void LineageCache::AddReadyTask(const Task &task) { } } -uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id) { - if (subscribed_tasks_.count(task_id) == 1) { - return 0; - } - auto entry = lineage_.GetEntry(task_id); - if (!entry) { - return 0; - } - uint64_t cnt = 1; - for (const auto& parent_id : entry->GetParentTaskIds()) { - cnt += CountUnsubscribedLineage(parent_id); - } - return cnt; +uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id) const{ + if (subscribed_tasks_.count(task_id) == 1) { + return 0; + } + auto entry = lineage_.GetEntry(task_id); + if (!entry) { + return 0; + } + uint64_t cnt = 1; + for (const auto& parent_id : entry->GetParentTaskIds()) { + cnt += CountUnsubscribedLineage(parent_id); + } + return cnt; } void LineageCache::RemoveWaitingTask(const TaskID &task_id) { diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index c24ee90217ac..588fe76d5b92 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -221,7 +221,7 @@ class LineageCache { /// was successful (whether we were subscribed). bool UnsubscribeTask(const UniqueID &task_id); /// Count the size of unsubscribed and uncommitted lineage - uint64_t CountUnsubscribedLineage(const UniqueID &task_id); + uint64_t CountUnsubscribedLineage(const UniqueID &task_id) const; /// The client ID, used to request notifications for specific tasks. /// TODO(swang): Move the ClientID into the generic Table implementation. From c598de2f2058e9af672a655eb62a34c77cba109c Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 7 Jun 2018 10:36:37 +0800 Subject: [PATCH 6/8] format --- src/ray/raylet/lineage_cache.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index cfa9d23924f5..e8e00ea0dbda 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -215,7 +215,7 @@ void LineageCache::AddReadyTask(const Task &task) { } } -uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id) const{ +uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id) const { if (subscribed_tasks_.count(task_id) == 1) { return 0; } From a97fb8abce482e89d268e18a4c47de41914490de Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 7 Jun 2018 10:45:17 +0800 Subject: [PATCH 7/8] format --- src/ray/raylet/lineage_cache.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index e8e00ea0dbda..06a7be587a53 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -224,7 +224,7 @@ uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id) const { return 0; } uint64_t cnt = 1; - for (const auto& parent_id : entry->GetParentTaskIds()) { + for (const auto &parent_id : entry->GetParentTaskIds()) { cnt += CountUnsubscribedLineage(parent_id); } return cnt; From ecb0acb66a753537e7c659d753e5984debbda1df Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 7 Jun 2018 10:55:27 +0800 Subject: [PATCH 8/8] remove unneeded code --- .gitignore | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.gitignore b/.gitignore index 2cfbadecddf6..fa9fe8f69d2b 100644 --- a/.gitignore +++ b/.gitignore @@ -147,7 +147,3 @@ build # Vscode .vscode/ -java/**/.settings -java/**/.classpath -java/**/.project -java/**/target \ No newline at end of file