diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 8180319ebf00..dd3c4cbad7f4 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -324,8 +324,8 @@ void GetUncommittedLineageHelper(const TaskID &task_id, const Lineage &lineage_f } } -Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id, - const ClientID &node_id) const { +Lineage LineageCache::GetUncommittedLineageOrDie(const TaskID &task_id, + const ClientID &node_id) const { Lineage uncommitted_lineage; // Add all uncommitted ancestors from the lineage cache to the uncommitted // lineage of the requested task. @@ -445,7 +445,7 @@ void LineageCache::HandleEntryCommitted(const TaskID &task_id) { UnsubscribeTask(task_id); } -const Task &LineageCache::GetTask(const TaskID &task_id) const { +const Task &LineageCache::GetTaskOrDie(const TaskID &task_id) const { const auto &entries = lineage_.GetEntries(); auto it = entries.find(task_id); RAY_CHECK(it != entries.end()); diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index 89ff84d6c278..1816d97f7cf2 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -263,11 +263,13 @@ class LineageCache { /// The uncommitted lineage consists of all tasks in the given task's lineage /// that have not been committed in the GCS, as far as we know. /// - /// \param task_id The ID of the task to get the uncommitted lineage for. + /// \param task_id The ID of the task to get the uncommitted lineage for. It is + /// a fatal error if the task is not found. /// \param node_id The ID of the receiving node. /// \return The uncommitted, unforwarded lineage of the task. The returned lineage /// includes the entry for the requested entry_id. - Lineage GetUncommittedLineage(const TaskID &task_id, const ClientID &node_id) const; + Lineage GetUncommittedLineageOrDie(const TaskID &task_id, + const ClientID &node_id) const; /// Handle the commit of a task entry in the GCS. This attempts to evict the /// task if possible. @@ -279,7 +281,7 @@ class LineageCache { /// /// \param task_id The ID of the task to get. /// \return A const reference to the task data. - const Task &GetTask(const TaskID &task_id) const; + const Task &GetTaskOrDie(const TaskID &task_id) const; /// Get whether the lineage cache contains the task. /// diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 32a0e593268b..973483759e4b 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -140,7 +140,7 @@ std::vector InsertTaskChain(LineageCache &lineage_cache, return arguments; } -TEST_F(LineageCacheTest, TestGetUncommittedLineage) { +TEST_F(LineageCacheTest, TestGetUncommittedLineageOrDie) { // Insert two independent chains of tasks. std::vector tasks1; auto return_values1 = @@ -160,7 +160,7 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineage) { // Get the uncommitted lineage for the last task (the leaf) of one of the chains. auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(task_ids1.back(), ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(task_ids1.back(), ClientID::nil()); // Check that the uncommitted lineage is exactly equal to the first chain of tasks. ASSERT_EQ(task_ids1.size(), uncommitted_lineage.GetEntries().size()); for (auto &task_id : task_ids1) { @@ -180,8 +180,8 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineage) { } // Get the uncommitted lineage for the inserted task. - uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(combined_task_ids.back(), ClientID::nil()); + uncommitted_lineage = lineage_cache_.GetUncommittedLineageOrDie( + combined_task_ids.back(), ClientID::nil()); // Check that the uncommitted lineage is exactly equal to the entire set of // tasks inserted so far. ASSERT_EQ(combined_task_ids.size(), uncommitted_lineage.GetEntries().size()); @@ -207,9 +207,9 @@ TEST_F(LineageCacheTest, TestMarkTaskAsForwarded) { lineage_cache_.MarkTaskAsForwarded(forwarded_task_id, node_id); auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(remaining_task_id, node_id); + lineage_cache_.GetUncommittedLineageOrDie(remaining_task_id, node_id); auto uncommitted_lineage_all = - lineage_cache_.GetUncommittedLineage(remaining_task_id, node_id2); + lineage_cache_.GetUncommittedLineageOrDie(remaining_task_id, node_id2); ASSERT_EQ(1, uncommitted_lineage.GetEntries().size()); ASSERT_EQ(4, uncommitted_lineage_all.GetEntries().size()); @@ -218,7 +218,7 @@ TEST_F(LineageCacheTest, TestMarkTaskAsForwarded) { // Check that lineage of requested task includes itself, regardless of whether // it has been forwarded before. auto uncommitted_lineage_forwarded = - lineage_cache_.GetUncommittedLineage(forwarded_task_id, node_id); + lineage_cache_.GetUncommittedLineageOrDie(forwarded_task_id, node_id); ASSERT_EQ(1, uncommitted_lineage_forwarded.GetEntries().size()); } @@ -284,8 +284,8 @@ TEST_F(LineageCacheTest, TestEvictChain) { // the flushed task, but its lineage should not be evicted yet. mock_gcs_.Flush(); ASSERT_EQ(lineage_cache_ - .GetUncommittedLineage(tasks.back().GetTaskSpecification().TaskId(), - ClientID::nil()) + .GetUncommittedLineageOrDie(tasks.back().GetTaskSpecification().TaskId(), + ClientID::nil()) .GetEntries() .size(), tasks.size()); @@ -297,8 +297,8 @@ TEST_F(LineageCacheTest, TestEvictChain) { mock_gcs_.RemoteAdd(tasks.at(1).GetTaskSpecification().TaskId(), task_data)); mock_gcs_.Flush(); ASSERT_EQ(lineage_cache_ - .GetUncommittedLineage(tasks.back().GetTaskSpecification().TaskId(), - ClientID::nil()) + .GetUncommittedLineageOrDie(tasks.back().GetTaskSpecification().TaskId(), + ClientID::nil()) .GetEntries() .size(), tasks.size()); @@ -334,8 +334,8 @@ TEST_F(LineageCacheTest, TestEvictManyParents) { mock_gcs_.Flush(); ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), total_tasks); ASSERT_EQ(lineage_cache_ - .GetUncommittedLineage(child_task.GetTaskSpecification().TaskId(), - ClientID::nil()) + .GetUncommittedLineageOrDie(child_task.GetTaskSpecification().TaskId(), + ClientID::nil()) .GetEntries() .size(), total_tasks); @@ -350,8 +350,8 @@ TEST_F(LineageCacheTest, TestEvictManyParents) { // since the parent tasks have no dependencies. ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), total_tasks); ASSERT_EQ(lineage_cache_ - .GetUncommittedLineage(child_task.GetTaskSpecification().TaskId(), - ClientID::nil()) + .GetUncommittedLineageOrDie( + child_task.GetTaskSpecification().TaskId(), ClientID::nil()) .GetEntries() .size(), total_tasks); @@ -376,7 +376,7 @@ TEST_F(LineageCacheTest, TestForwardTasksRoundTrip) { const auto task_id = it->GetTaskSpecification().TaskId(); // Simulate removing the task and forwarding it to another node. auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(task_id, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(task_id, ClientID::nil()); ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id)); // Simulate receiving the task again. Make sure we can add the task back. flatbuffers::FlatBufferBuilder fbb; @@ -400,7 +400,7 @@ TEST_F(LineageCacheTest, TestForwardTask) { tasks.erase(it); auto task_id_to_remove = forwarded_task.GetTaskSpecification().TaskId(); auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(task_id_to_remove, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(task_id_to_remove, ClientID::nil()); ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id_to_remove)); ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), 3); @@ -450,7 +450,7 @@ TEST_F(LineageCacheTest, TestEviction) { // uncommitted lineage. const auto last_task_id = tasks.back().GetTaskSpecification().TaskId(); auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil()); ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); // Simulate executing the first task on a remote node and adding it to the @@ -484,7 +484,7 @@ TEST_F(LineageCacheTest, TestEviction) { // All tasks have now been flushed. Check that enough lineage has been // evicted that the uncommitted lineage is now less than the maximum size. uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil()); ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_); // The remaining task should have no uncommitted lineage. ASSERT_EQ(uncommitted_lineage.GetEntries().size(), 1); @@ -510,7 +510,7 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) { // uncommitted lineage. const auto last_task_id = tasks.back().GetTaskSpecification().TaskId(); auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil()); + lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil()); ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), lineage_size); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 952fadabe1dc..b82003e7c66b 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1752,7 +1752,7 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { "allocation via " << "ray.init(redis_max_memory=)."; // Use a copy of the cached task spec to re-execute the task. - const Task task = lineage_cache_.GetTask(task_id); + const Task task = lineage_cache_.GetTaskOrDie(task_id); ResubmitTask(task); })); @@ -1903,9 +1903,17 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id, auto task_id = spec.TaskId(); // Get and serialize the task's unforwarded, uncommitted lineage. - auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id, node_id); - Task &lineage_cache_entry_task = - uncommitted_lineage.GetEntryMutable(task_id)->TaskDataMutable(); + Lineage uncommitted_lineage; + if (lineage_cache_.ContainsTask(task_id)) { + uncommitted_lineage = lineage_cache_.GetUncommittedLineageOrDie(task_id, node_id); + } else { + // TODO: We expected the lineage to be in cache, but it was evicted (#3813). + // This is a bug but is not fatal to the application. + RAY_DCHECK(false) << "No lineage cache entry found for task " << task_id; + uncommitted_lineage.SetEntry(task, GcsStatus::NONE); + } + auto entry = uncommitted_lineage.GetEntryMutable(task_id); + Task &lineage_cache_entry_task = entry->TaskDataMutable(); // Increment forward count for the forwarded task. lineage_cache_entry_task.IncrementNumForwards(); diff --git a/src/ray/util/logging.h b/src/ray/util/logging.h index 6f657142efd7..d37ab9a73897 100644 --- a/src/ray/util/logging.h +++ b/src/ray/util/logging.h @@ -24,10 +24,11 @@ enum class RayLogLevel { DEBUG = -1, INFO = 0, WARNING = 1, ERROR = 2, FATAL = 3 #ifdef NDEBUG -#define RAY_DCHECK(condition) \ - RAY_IGNORE_EXPR(condition); \ - while (false) ::ray::RayLogBase() - +#define RAY_DCHECK(condition) \ + (condition) ? RAY_IGNORE_EXPR(0) \ + : ::ray::Voidify() & \ + ::ray::RayLog(__FILE__, __LINE__, ray::RayLogLevel::ERROR) \ + << " Debug check failed: " #condition " " #else #define RAY_DCHECK(condition) RAY_CHECK(condition)