diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 9cc9cad1b919..d5ac1c93133e 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -236,14 +236,21 @@ Status LineageCache::Flush() { // committed yet, then as far as we know, it's still in flight to the // GCS. Skip this task for now. if (parent && parent->GetStatus() != GcsStatus_COMMITTED) { - // Request notifications about the parent entry's commit in the GCS. - // Once we receive a notification about the task's commit via - // HandleEntryCommitted, then this task will be ready to write on the - // next call to Flush(). - auto inserted = subscribed_tasks_.insert(parent_id); - if (inserted.second) { - RAY_CHECK_OK( - task_pubsub_.RequestNotifications(JobID::nil(), parent_id, client_id_)); + RAY_CHECK(parent->GetStatus() != GcsStatus_UNCOMMITTED_WAITING) + << "Children should not become ready to flush before their parents."; + // Request notifications about the parent entry's commit in the GCS if + // the parent is remote. Otherwise, the parent is local and will + // eventually be flushed. In either case, once we receive a + // notification about the task's commit via HandleEntryCommitted, then + // this task will be ready to write on the next call to Flush(). + if (parent->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE) { + auto inserted = subscribed_tasks_.insert(parent_id); + if (inserted.second) { + // Only request notifications about the parent entry if we haven't + // already requested notifications for it. + RAY_CHECK_OK( + task_pubsub_.RequestNotifications(JobID::nil(), parent_id, client_id_)); + } } all_arguments_committed = false; break; @@ -300,6 +307,7 @@ void PopAncestorTasks(const UniqueID &task_id, Lineage &lineage) { void LineageCache::HandleEntryCommitted(const UniqueID &task_id) { RAY_LOG(DEBUG) << "task committed: " << task_id; auto entry = lineage_.PopEntry(task_id); + RAY_CHECK(entry); for (const auto &parent_id : entry->GetParentTaskIds()) { PopAncestorTasks(parent_id, lineage_); } diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 1a19feb5c2ca..631767346129 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -253,24 +253,21 @@ TEST_F(LineageCacheTest, TestWritebackPartiallyReady) { lineage_cache_.AddWaitingTask(task2, Lineage()); lineage_cache_.AddWaitingTask(dependent_task, Lineage()); - // Mark one of the independent tasks and the dependent task as ready. + // Flush one of the independent tasks. lineage_cache_.AddReadyTask(task1); - lineage_cache_.AddReadyTask(dependent_task); - // Check that only the first independent task is flushed. num_tasks_flushed++; CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); - - // Flush acknowledgements. The dependent task should still not be flushed - // since task2 is not committed yet. + // Flush acknowledgements. The lineage cache should receive the commit for + // the first task. mock_gcs_.Flush(); - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); - - // Mark the other independent task as ready. + // Mark the other independent task and the dependent as ready. lineage_cache_.AddReadyTask(task2); - // Check that the other independent task gets flushed. + lineage_cache_.AddReadyTask(dependent_task); + // Two tasks are ready, but only the independent task should be flushed. The + // dependent task should only be flushed once commits for both independent + // tasks are received. num_tasks_flushed++; CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); - // Flush acknowledgements. The dependent task should now be able to be // written. mock_gcs_.Flush();