From 2f0baa619dc30d9b06fadf64fb2323f015ea8616 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 25 Jan 2019 14:35:39 -0800 Subject: [PATCH 1/7] fix linea --- src/ray/raylet/lineage_cache.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 8180319ebf00..c3bb4d439e15 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -335,8 +335,11 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id, // already explicitly forwarded to this node before. if (uncommitted_lineage.GetEntries().empty()) { auto entry = lineage_.GetEntry(task_id); - RAY_CHECK(entry); - RAY_CHECK(uncommitted_lineage.SetEntry(entry->TaskData(), entry->GetStatus())); + if (entry) { + RAY_CHECK(uncommitted_lineage.SetEntry(entry->TaskData(), entry->GetStatus())); + } else { + RAY_LOG(ERROR) << "No lineage cache entry found for task " << task_id; + } } return uncommitted_lineage; } From a5c66fdb5d0cd1439e4871c4ccfddced0870d2da Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 25 Jan 2019 20:59:04 -0800 Subject: [PATCH 2/7] update --- src/ray/raylet/lineage_cache.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index c3bb4d439e15..eaac6fb0e9d9 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -338,6 +338,8 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id, if (entry) { RAY_CHECK(uncommitted_lineage.SetEntry(entry->TaskData(), entry->GetStatus())); } else { + // TODO: We expected the lineage to be in cache, but it was evicted (#3813). + // This is a bug but is not fatal to the application. RAY_LOG(ERROR) << "No lineage cache entry found for task " << task_id; } } From 58904eed8d850ea9e5b3145acf768878f24ea330 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sat, 26 Jan 2019 00:18:16 -0800 Subject: [PATCH 3/7] Update lineage_cache.cc --- src/ray/raylet/lineage_cache.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index eaac6fb0e9d9..dd6262e1cf48 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -341,6 +341,7 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id, // TODO: We expected the lineage to be in cache, but it was evicted (#3813). // This is a bug but is not fatal to the application. RAY_LOG(ERROR) << "No lineage cache entry found for task " << task_id; + RAY_DCHECK(false); } } return uncommitted_lineage; From eb3cbff5cd509efc1e0aa9329c672cafd5e71b15 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sun, 27 Jan 2019 16:06:03 -0800 Subject: [PATCH 4/7] wip --- src/ray/raylet/lineage_cache.cc | 2 +- src/ray/raylet/node_manager.cc | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index dd6262e1cf48..9b193a8061c0 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -167,7 +167,7 @@ const std::unordered_map &Lineage::GetEntries() cons flatbuffers::Offset Lineage::ToFlatbuffer( flatbuffers::FlatBufferBuilder &fbb, const TaskID &task_id) const { - RAY_CHECK(GetEntry(task_id)); + RAY_DCHECK(GetEntry(task_id)); // Serialize the task and object entries. std::vector> uncommitted_tasks; for (const auto &entry : entries_) { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 952fadabe1dc..79909f4980ea 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1903,9 +1903,14 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id, auto task_id = spec.TaskId(); // Get and serialize the task's unforwarded, uncommitted lineage. - auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id, node_id); - Task &lineage_cache_entry_task = - uncommitted_lineage.GetEntryMutable(task_id)->TaskDataMutable(); + Lineage uncommitted_lineage; + if (lineage_cache_.ContainsTask(task_id)) { + uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id, node_id); + } else { + uncommitted_lineage.SetEntry(task, GcsStatus::NONE); + } + auto entry = uncommitted_lineage.GetEntryMutable(task_id) + Task &lineage_cache_entry_task = entry->TaskDataMutable(); // Increment forward count for the forwarded task. lineage_cache_entry_task.IncrementNumForwards(); From 2a3dc9d0b7e17b60068811c4c60746c741d8d3f5 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sun, 27 Jan 2019 16:16:42 -0800 Subject: [PATCH 5/7] fix --- src/ray/raylet/node_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 79909f4980ea..de175a6454c4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1909,7 +1909,7 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id, } else { uncommitted_lineage.SetEntry(task, GcsStatus::NONE); } - auto entry = uncommitted_lineage.GetEntryMutable(task_id) + auto entry = uncommitted_lineage.GetEntryMutable(task_id); Task &lineage_cache_entry_task = entry->TaskDataMutable(); // Increment forward count for the forwarded task. From c60def38d04505907ceac70e31ce20c3c177d9f1 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sun, 27 Jan 2019 16:20:02 -0800 Subject: [PATCH 6/7] fix --- src/ray/raylet/lineage_cache.cc | 12 +++--------- src/ray/raylet/node_manager.cc | 8 ++++++-- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 9b193a8061c0..8180319ebf00 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -167,7 +167,7 @@ const std::unordered_map &Lineage::GetEntries() cons flatbuffers::Offset Lineage::ToFlatbuffer( flatbuffers::FlatBufferBuilder &fbb, const TaskID &task_id) const { - RAY_DCHECK(GetEntry(task_id)); + RAY_CHECK(GetEntry(task_id)); // Serialize the task and object entries. std::vector> uncommitted_tasks; for (const auto &entry : entries_) { @@ -335,14 +335,8 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id, // already explicitly forwarded to this node before. if (uncommitted_lineage.GetEntries().empty()) { auto entry = lineage_.GetEntry(task_id); - if (entry) { - RAY_CHECK(uncommitted_lineage.SetEntry(entry->TaskData(), entry->GetStatus())); - } else { - // TODO: We expected the lineage to be in cache, but it was evicted (#3813). - // This is a bug but is not fatal to the application. - RAY_LOG(ERROR) << "No lineage cache entry found for task " << task_id; - RAY_DCHECK(false); - } + RAY_CHECK(entry); + RAY_CHECK(uncommitted_lineage.SetEntry(entry->TaskData(), entry->GetStatus())); } return uncommitted_lineage; } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index de175a6454c4..6fa154bc87f2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1905,9 +1905,13 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id, // Get and serialize the task's unforwarded, uncommitted lineage. Lineage uncommitted_lineage; if (lineage_cache_.ContainsTask(task_id)) { - uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id, node_id); + uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id, node_id); } else { - uncommitted_lineage.SetEntry(task, GcsStatus::NONE); + // TODO: We expected the lineage to be in cache, but it was evicted (#3813). + // This is a bug but is not fatal to the application. + RAY_LOG(ERROR) << "No lineage cache entry found for task " << task_id; + RAY_DCHECK(false); + uncommitted_lineage.SetEntry(task, GcsStatus::NONE); } auto entry = uncommitted_lineage.GetEntryMutable(task_id); Task &lineage_cache_entry_task = entry->TaskDataMutable(); From 798e724a1363b85f8040162a15d657a52151ddb8 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sun, 27 Jan 2019 16:31:32 -0800 Subject: [PATCH 7/7] rename fatal --- src/ray/raylet/lineage_cache.cc | 6 ++--- src/ray/raylet/lineage_cache.h | 8 +++--- src/ray/raylet/lineage_cache_test.cc | 40 ++++++++++++++-------------- src/ray/raylet/node_manager.cc | 7 +++-- src/ray/util/logging.h | 9 ++++--- 5 files changed, 36 insertions(+), 34 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 8180319ebf00..dd3c4cbad7f4 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -324,8 +324,8 @@ void GetUncommittedLineageHelper(const TaskID &task_id, const Lineage &lineage_f } } -Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id, - const ClientID &node_id) const { +Lineage LineageCache::GetUncommittedLineageOrDie(const TaskID &task_id, + const ClientID &node_id) const { Lineage uncommitted_lineage; // Add all uncommitted ancestors from the lineage cache to the uncommitted // lineage of the requested task. @@ -445,7 +445,7 @@ void LineageCache::HandleEntryCommitted(const TaskID &task_id) { UnsubscribeTask(task_id); } -const Task &LineageCache::GetTask(const TaskID &task_id) const { +const Task &LineageCache::GetTaskOrDie(const TaskID &task_id) const { const auto &entries = lineage_.GetEntries(); auto it = entries.find(task_id); RAY_CHECK(it != entries.end()); diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index 89ff84d6c278..1816d97f7cf2 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -263,11 +263,13 @@ class LineageCache { /// The uncommitted lineage consists of all tasks in the given task's lineage /// that have not been committed in the GCS, as far as we know. /// - /// \param task_id The ID of the task to get the uncommitted lineage for. + /// \param task_id The ID of the task to get the uncommitted lineage for. It is + /// a fatal error if the task is not found. /// \param node_id The ID of the receiving node. /// \return The uncommitted, unforwarded lineage of the task. The returned lineage /// includes the entry for the requested entry_id. - Lineage GetUncommittedLineage(const TaskID &task_id, const ClientID &node_id) const; + Lineage GetUncommittedLineageOrDie(const TaskID &task_id, + const ClientID &node_id) const; /// Handle the commit of a task entry in the GCS. This attempts to evict the /// task if possible. @@ -279,7 +281,7 @@ class LineageCache { /// /// \param task_id The ID of the task to get. /// \return A const reference to the task data. - const Task &GetTask(const TaskID &task_id) const; + const Task &GetTaskOrDie(const TaskID &task_id) const; /// Get whether the lineage cache contains the task. /// diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 32a0e593268b..973483759e4b 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -140,7 +140,7 @@ std::vector InsertTaskChain(LineageCache &lineage_cache, return arguments; } -TEST_F(LineageCacheTest, TestGetUncommittedLineage) { +TEST_F(LineageCacheTest, TestGetUncommittedLineageOrDie) { // Insert two independent chains of tasks. std::vector tasks1; auto return_values1 = @@ -160,7 +160,7 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineage) { // Get the uncommitted lineage for the last task (the leaf) of one of the chains. auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(task_ids1.back(), ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(task_ids1.back(), ClientID::nil()); // Check that the uncommitted lineage is exactly equal to the first chain of tasks. ASSERT_EQ(task_ids1.size(), uncommitted_lineage.GetEntries().size()); for (auto &task_id : task_ids1) { @@ -180,8 +180,8 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineage) { } // Get the uncommitted lineage for the inserted task. - uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(combined_task_ids.back(), ClientID::nil()); + uncommitted_lineage = lineage_cache_.GetUncommittedLineageOrDie( + combined_task_ids.back(), ClientID::nil()); // Check that the uncommitted lineage is exactly equal to the entire set of // tasks inserted so far. ASSERT_EQ(combined_task_ids.size(), uncommitted_lineage.GetEntries().size()); @@ -207,9 +207,9 @@ TEST_F(LineageCacheTest, TestMarkTaskAsForwarded) { lineage_cache_.MarkTaskAsForwarded(forwarded_task_id, node_id); auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(remaining_task_id, node_id); + lineage_cache_.GetUncommittedLineageOrDie(remaining_task_id, node_id); auto uncommitted_lineage_all = - lineage_cache_.GetUncommittedLineage(remaining_task_id, node_id2); + lineage_cache_.GetUncommittedLineageOrDie(remaining_task_id, node_id2); ASSERT_EQ(1, uncommitted_lineage.GetEntries().size()); ASSERT_EQ(4, uncommitted_lineage_all.GetEntries().size()); @@ -218,7 +218,7 @@ TEST_F(LineageCacheTest, TestMarkTaskAsForwarded) { // Check that lineage of requested task includes itself, regardless of whether // it has been forwarded before. auto uncommitted_lineage_forwarded = - lineage_cache_.GetUncommittedLineage(forwarded_task_id, node_id); + lineage_cache_.GetUncommittedLineageOrDie(forwarded_task_id, node_id); ASSERT_EQ(1, uncommitted_lineage_forwarded.GetEntries().size()); } @@ -284,8 +284,8 @@ TEST_F(LineageCacheTest, TestEvictChain) { // the flushed task, but its lineage should not be evicted yet. mock_gcs_.Flush(); ASSERT_EQ(lineage_cache_ - .GetUncommittedLineage(tasks.back().GetTaskSpecification().TaskId(), - ClientID::nil()) + .GetUncommittedLineageOrDie(tasks.back().GetTaskSpecification().TaskId(), + ClientID::nil()) .GetEntries() .size(), tasks.size()); @@ -297,8 +297,8 @@ TEST_F(LineageCacheTest, TestEvictChain) { mock_gcs_.RemoteAdd(tasks.at(1).GetTaskSpecification().TaskId(), task_data)); mock_gcs_.Flush(); ASSERT_EQ(lineage_cache_ - .GetUncommittedLineage(tasks.back().GetTaskSpecification().TaskId(), - ClientID::nil()) + .GetUncommittedLineageOrDie(tasks.back().GetTaskSpecification().TaskId(), + ClientID::nil()) .GetEntries() .size(), tasks.size()); @@ -334,8 +334,8 @@ TEST_F(LineageCacheTest, TestEvictManyParents) { mock_gcs_.Flush(); ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), total_tasks); ASSERT_EQ(lineage_cache_ - .GetUncommittedLineage(child_task.GetTaskSpecification().TaskId(), - ClientID::nil()) + .GetUncommittedLineageOrDie(child_task.GetTaskSpecification().TaskId(), + ClientID::nil()) .GetEntries() .size(), total_tasks); @@ -350,8 +350,8 @@ TEST_F(LineageCacheTest, TestEvictManyParents) { // since the parent tasks have no dependencies. ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), total_tasks); ASSERT_EQ(lineage_cache_ - .GetUncommittedLineage(child_task.GetTaskSpecification().TaskId(), - ClientID::nil()) + .GetUncommittedLineageOrDie( + child_task.GetTaskSpecification().TaskId(), ClientID::nil()) .GetEntries() .size(), total_tasks); @@ -376,7 +376,7 @@ TEST_F(LineageCacheTest, TestForwardTasksRoundTrip) { const auto task_id = it->GetTaskSpecification().TaskId(); // Simulate removing the task and forwarding it to another node. auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(task_id, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(task_id, ClientID::nil()); ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id)); // Simulate receiving the task again. Make sure we can add the task back. flatbuffers::FlatBufferBuilder fbb; @@ -400,7 +400,7 @@ TEST_F(LineageCacheTest, TestForwardTask) { tasks.erase(it); auto task_id_to_remove = forwarded_task.GetTaskSpecification().TaskId(); auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(task_id_to_remove, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(task_id_to_remove, ClientID::nil()); ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id_to_remove)); ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), 3); @@ -450,7 +450,7 @@ TEST_F(LineageCacheTest, TestEviction) { // uncommitted lineage. const auto last_task_id = tasks.back().GetTaskSpecification().TaskId(); auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil()); ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); // Simulate executing the first task on a remote node and adding it to the @@ -484,7 +484,7 @@ TEST_F(LineageCacheTest, TestEviction) { // All tasks have now been flushed. Check that enough lineage has been // evicted that the uncommitted lineage is now less than the maximum size. uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil()); ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_); // The remaining task should have no uncommitted lineage. ASSERT_EQ(uncommitted_lineage.GetEntries().size(), 1); @@ -510,7 +510,7 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) { // uncommitted lineage. const auto last_task_id = tasks.back().GetTaskSpecification().TaskId(); auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil()); ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), lineage_size); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 6fa154bc87f2..b82003e7c66b 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1752,7 +1752,7 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { "allocation via " << "ray.init(redis_max_memory=)."; // Use a copy of the cached task spec to re-execute the task. - const Task task = lineage_cache_.GetTask(task_id); + const Task task = lineage_cache_.GetTaskOrDie(task_id); ResubmitTask(task); })); @@ -1905,12 +1905,11 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id, // Get and serialize the task's unforwarded, uncommitted lineage. Lineage uncommitted_lineage; if (lineage_cache_.ContainsTask(task_id)) { - uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id, node_id); + uncommitted_lineage = lineage_cache_.GetUncommittedLineageOrDie(task_id, node_id); } else { // TODO: We expected the lineage to be in cache, but it was evicted (#3813). // This is a bug but is not fatal to the application. - RAY_LOG(ERROR) << "No lineage cache entry found for task " << task_id; - RAY_DCHECK(false); + RAY_DCHECK(false) << "No lineage cache entry found for task " << task_id; uncommitted_lineage.SetEntry(task, GcsStatus::NONE); } auto entry = uncommitted_lineage.GetEntryMutable(task_id); diff --git a/src/ray/util/logging.h b/src/ray/util/logging.h index 6f657142efd7..d37ab9a73897 100644 --- a/src/ray/util/logging.h +++ b/src/ray/util/logging.h @@ -24,10 +24,11 @@ enum class RayLogLevel { DEBUG = -1, INFO = 0, WARNING = 1, ERROR = 2, FATAL = 3 #ifdef NDEBUG -#define RAY_DCHECK(condition) \ - RAY_IGNORE_EXPR(condition); \ - while (false) ::ray::RayLogBase() - +#define RAY_DCHECK(condition) \ + (condition) ? RAY_IGNORE_EXPR(0) \ + : ::ray::Voidify() & \ + ::ray::RayLog(__FILE__, __LINE__, ray::RayLogLevel::ERROR) \ + << " Debug check failed: " #condition " " #else #define RAY_DCHECK(condition) RAY_CHECK(condition)