From 5a80c88daa54a4c4f732e086377420275f3ffb21 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Wed, 23 May 2018 13:40:18 -0700 Subject: [PATCH 1/6] Private method to flush a single task from the lineage cache --- src/ray/raylet/lineage_cache.cc | 102 ++++++++++++++++---------------- src/ray/raylet/lineage_cache.h | 2 + 2 files changed, 52 insertions(+), 52 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index d5ac1c93133e..f45738c4bcf8 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -219,57 +219,46 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id) const { return uncommitted_lineage; } -Status LineageCache::Flush() { - // Iterate through all tasks that are READY. - std::vector ready_task_ids; - for (const auto &task_id : uncommitted_ready_tasks_) { - auto entry = lineage_.GetEntry(task_id); - RAY_CHECK(entry); - RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_READY); - - // Check if all arguments have been committed to the GCS before writing - // this task. - bool all_arguments_committed = true; - for (const auto &parent_id : entry->GetParentTaskIds()) { - auto parent = lineage_.GetEntry(parent_id); - // If a parent entry exists in the lineage cache but has not been - // committed yet, then as far as we know, it's still in flight to the - // GCS. Skip this task for now. - if (parent && parent->GetStatus() != GcsStatus_COMMITTED) { - RAY_CHECK(parent->GetStatus() != GcsStatus_UNCOMMITTED_WAITING) - << "Children should not become ready to flush before their parents."; - // Request notifications about the parent entry's commit in the GCS if - // the parent is remote. Otherwise, the parent is local and will - // eventually be flushed. In either case, once we receive a - // notification about the task's commit via HandleEntryCommitted, then - // this task will be ready to write on the next call to Flush(). - if (parent->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE) { - auto inserted = subscribed_tasks_.insert(parent_id); - if (inserted.second) { - // Only request notifications about the parent entry if we haven't - // already requested notifications for it. - RAY_CHECK_OK( - task_pubsub_.RequestNotifications(JobID::nil(), parent_id, client_id_)); - } +bool LineageCache::FlushTask(const TaskID &task_id) { + auto entry = lineage_.GetEntry(task_id); + RAY_CHECK(entry); + RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_READY); + + // Check if all arguments have been committed to the GCS before writing + // this task. + bool all_arguments_committed = true; + for (const auto &parent_id : entry->GetParentTaskIds()) { + auto parent = lineage_.GetEntry(parent_id); + // If a parent entry exists in the lineage cache but has not been + // committed yet, then as far as we know, it's still in flight to the + // GCS. Skip this task for now. + if (parent && parent->GetStatus() != GcsStatus_COMMITTED) { + RAY_CHECK(parent->GetStatus() != GcsStatus_UNCOMMITTED_WAITING) + << "Children should not become ready to flush before their parents."; + // Request notifications about the parent entry's commit in the GCS if + // the parent is remote. Otherwise, the parent is local and will + // eventually be flushed. In either case, once we receive a + // notification about the task's commit via HandleEntryCommitted, then + // this task will be ready to write on the next call to Flush(). + if (parent->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE) { + auto inserted = subscribed_tasks_.insert(parent_id); + if (inserted.second) { + // Only request notifications about the parent entry if we haven't + // already requested notifications for it. + RAY_CHECK_OK( + task_pubsub_.RequestNotifications(JobID::nil(), parent_id, client_id_)); } - all_arguments_committed = false; - break; } - } - if (all_arguments_committed) { - // All arguments have been committed to the GCS. Add this task to the - // list of tasks to write back to the GCS. - ready_task_ids.push_back(task_id); + all_arguments_committed = false; + break; } } - - // Write back all ready tasks whose arguments have been committed to the GCS. - gcs::raylet::TaskTable::WriteCallback task_callback = [this]( - ray::gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) { - HandleEntryCommitted(id); - }; - for (const auto &ready_task_id : ready_task_ids) { - auto task = lineage_.GetEntry(ready_task_id); + if (all_arguments_committed) { + gcs::raylet::TaskTable::WriteCallback task_callback = [this]( + ray::gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) { + HandleEntryCommitted(id); + }; + auto task = lineage_.GetEntry(task_id); // TODO(swang): Make this better... flatbuffers::FlatBufferBuilder fbb; auto message = task->TaskData().ToFlatbuffer(fbb); @@ -278,18 +267,27 @@ Status LineageCache::Flush() { auto root = flatbuffers::GetRoot(fbb.GetBufferPointer()); root->UnPackTo(task_data.get()); RAY_CHECK_OK(task_storage_.Add(task->TaskData().GetTaskSpecification().DriverId(), - ready_task_id, task_data, task_callback)); + task_id, task_data, task_callback)); // We successfully wrote the task, so mark it as committing. // TODO(swang): Use a batched interface and write with all object entries. - auto entry = lineage_.PopEntry(ready_task_id); + auto entry = lineage_.PopEntry(task_id); RAY_CHECK(entry->SetStatus(GcsStatus_COMMITTING)); RAY_CHECK(lineage_.SetEntry(std::move(*entry))); - // Erase the task from the cache of uncommitted ready tasks. - uncommitted_ready_tasks_.erase(ready_task_id); } + return all_arguments_committed; +} - return ray::Status::OK(); +void LineageCache::Flush() { + // Iterate through all tasks that are READY. + std::vector ready_task_ids; + for (const auto &task_id : uncommitted_ready_tasks_) { + bool flushed = FlushTask(task_id); + if (flushed) { + // Erase the task from the cache of uncommitted ready tasks. + uncommitted_ready_tasks_.erase(task_id); + } + } } void PopAncestorTasks(const UniqueID &task_id, Lineage &lineage) { diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index 44b9b62f4776..a9eb0ae81404 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -210,6 +210,8 @@ class LineageCache { void HandleEntryCommitted(const TaskID &task_id); private: + bool FlushTask(const TaskID &task_id); + /// The client ID, used to request notifications for specific tasks. /// TODO(swang): Move the ClientID into the generic Table implementation. ClientID client_id_; From 622a91082834bc7d65d3aabb5c2aa16f42af2149 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Wed, 23 May 2018 15:47:38 -0700 Subject: [PATCH 2/6] Track parent->child relationships for faster flushing --- src/ray/raylet/lineage_cache.cc | 34 +++++++++++++++++++++++----- src/ray/raylet/lineage_cache.h | 7 ++++++ src/ray/raylet/lineage_cache_test.cc | 18 ++++++++------- 3 files changed, 45 insertions(+), 14 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index f45738c4bcf8..a80be142dcbc 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -250,7 +250,7 @@ bool LineageCache::FlushTask(const TaskID &task_id) { } } all_arguments_committed = false; - break; + task_children_[parent_id].insert(task_id); } } if (all_arguments_committed) { @@ -278,16 +278,20 @@ bool LineageCache::FlushTask(const TaskID &task_id) { return all_arguments_committed; } -void LineageCache::Flush() { +ray::Status LineageCache::Flush() { // Iterate through all tasks that are READY. std::vector ready_task_ids; - for (const auto &task_id : uncommitted_ready_tasks_) { - bool flushed = FlushTask(task_id); + for (auto it = uncommitted_ready_tasks_.begin(); + it != uncommitted_ready_tasks_.end();) { + bool flushed = FlushTask(*it); + // Erase the task from the cache of uncommitted ready tasks. if (flushed) { - // Erase the task from the cache of uncommitted ready tasks. - uncommitted_ready_tasks_.erase(task_id); + it = uncommitted_ready_tasks_.erase(it); + } else { + it++; } } + return ray::Status::OK(); } void PopAncestorTasks(const UniqueID &task_id, Lineage &lineage) { @@ -326,6 +330,24 @@ void LineageCache::HandleEntryCommitted(const UniqueID &task_id) { RAY_CHECK_OK(task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_)); subscribed_tasks_.erase(it); } + + auto children_entry = task_children_.find(task_id); + if (children_entry != task_children_.end()) { + // Get the children of the committed task that are uncommitted but ready. + auto children = std::move(children_entry->second); + task_children_.erase(children_entry); + + // Try to flush the children. If all of the child's parents are committed, + // then the child will be flushed here. + for (const auto &child_id : children) { + bool flushed = FlushTask(child_id); + // Erase the child task from the cache of uncommitted ready tasks. + if (flushed) { + auto erased = uncommitted_ready_tasks_.erase(child_id); + RAY_CHECK(erased == 1); + } + } + } } } // namespace raylet diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index a9eb0ae81404..d124ceda2fcb 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -210,6 +210,9 @@ class LineageCache { void HandleEntryCommitted(const TaskID &task_id); private: + /// Try to flush a task that is in UNCOMMITTED_READY state. If the task has + /// parents that are not committed yet, then the child will be flushed once + /// the parents have been committed. bool FlushTask(const TaskID &task_id); /// The client ID, used to request notifications for specific tasks. @@ -233,6 +236,10 @@ class LineageCache { /// The tasks that we've subscribed to notifications for from the pubsub /// storage system. We will receive a notification for these tasks on commit. std::unordered_set subscribed_tasks_; + /// A mapping from each task that hasn't been committed yet, to all dependent + /// children tasks that are in UNCOMMITTED_READY state. Once all parents of + /// the child task have been committed, the child task may be flushed. + std::unordered_map> task_children_; }; } // namespace raylet diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 631767346129..81b7be3a4080 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -62,10 +62,11 @@ class MockGcs : public gcs::TableInterface, } void Flush() { - for (const auto &callback : callbacks_) { + auto callbacks = std::move(callbacks_); + callbacks_.clear(); + for (const auto &callback : callbacks) { callback.first(NULL, callback.second, *task_table_[callback.second]); } - callbacks_.clear(); } const std::unordered_map> &TaskTable() const { @@ -219,15 +220,16 @@ TEST_F(LineageCacheTest, TestWritebackOrder) { auto return_values1 = InsertTaskChain(lineage_cache_, tasks, 3, std::vector(), 1); - // Mark all tasks as ready. + // Mark all tasks as ready. The first task, which has no dependencies, should + // be flushed. for (const auto &task : tasks) { lineage_cache_.AddReadyTask(task); } // Check that we write back the tasks in order of data dependencies. for (size_t i = 0; i < tasks.size(); i++) { num_tasks_flushed++; - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); - // Flush acknowledgements. The next task should be able to be written. + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); + // Flush acknowledgements. The next task should have been flushed. mock_gcs_.Flush(); } } @@ -268,11 +270,11 @@ TEST_F(LineageCacheTest, TestWritebackPartiallyReady) { // tasks are received. num_tasks_flushed++; CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); - // Flush acknowledgements. The dependent task should now be able to be - // written. + // Flush acknowledgements. Both independent tasks should now be committed. mock_gcs_.Flush(); + // The dependent task should now be flushed. num_tasks_flushed++; - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); } TEST_F(LineageCacheTest, TestForwardTaskRoundTrip) { From 98224d0083f2bc7978ea406f2de6711f2df6837e Mon Sep 17 00:00:00 2001 From: Stephanie Date: Wed, 23 May 2018 15:51:53 -0700 Subject: [PATCH 3/6] doc --- src/ray/raylet/lineage_cache.cc | 11 +++++++---- src/ray/raylet/lineage_cache.h | 9 +++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index a80be142dcbc..0d8f9e01902f 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -250,7 +250,10 @@ bool LineageCache::FlushTask(const TaskID &task_id) { } } all_arguments_committed = false; - task_children_[parent_id].insert(task_id); + // Track the fact that this task is dependent on a parent that hasn't yet + // been committed, for fast lookup. Once all parents are committed, the + // child will be flushed. + uncommitted_ready_children_[parent_id].insert(task_id); } } if (all_arguments_committed) { @@ -331,11 +334,11 @@ void LineageCache::HandleEntryCommitted(const UniqueID &task_id) { subscribed_tasks_.erase(it); } - auto children_entry = task_children_.find(task_id); - if (children_entry != task_children_.end()) { + auto children_entry = uncommitted_ready_children_.find(task_id); + if (children_entry != uncommitted_ready_children_.end()) { // Get the children of the committed task that are uncommitted but ready. auto children = std::move(children_entry->second); - task_children_.erase(children_entry); + uncommitted_ready_children_.erase(children_entry); // Try to flush the children. If all of the child's parents are committed, // then the child will be flushed here. diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index d124ceda2fcb..cbca200a5fef 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -230,16 +230,17 @@ class LineageCache { // UNCOMMITTED_READY, but that have dependencies that have not been committed // yet. std::unordered_set uncommitted_ready_tasks_; + /// A mapping from each task that hasn't been committed yet, to all dependent + /// children tasks that are in UNCOMMITTED_READY state. This is used when the + /// parent task is committed, for fast lookup of children that may now be + /// flushed. + std::unordered_map> uncommitted_ready_children_; /// All tasks and objects that we are responsible for writing back to the /// GCS, and the tasks and objects in their lineage. Lineage lineage_; /// The tasks that we've subscribed to notifications for from the pubsub /// storage system. We will receive a notification for these tasks on commit. std::unordered_set subscribed_tasks_; - /// A mapping from each task that hasn't been committed yet, to all dependent - /// children tasks that are in UNCOMMITTED_READY state. Once all parents of - /// the child task have been committed, the child task may be flushed. - std::unordered_map> task_children_; }; } // namespace raylet From effd81e41e6081ae6b3c6c3b4a04daafec10cc1c Mon Sep 17 00:00:00 2001 From: Stephanie Date: Wed, 23 May 2018 15:54:21 -0700 Subject: [PATCH 4/6] Only flush the newly ready task --- src/ray/raylet/lineage_cache.cc | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 0d8f9e01902f..7c3e0bbc5b71 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -187,12 +187,14 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l void LineageCache::AddReadyTask(const Task &task) { auto new_entry = LineageEntry(task, GcsStatus_UNCOMMITTED_READY); RAY_CHECK(lineage_.SetEntry(std::move(new_entry))); - // Add the task to the cache of tasks that may be flushed. - uncommitted_ready_tasks_.insert(task.GetTaskSpecification().TaskId()); - - // Try to flush the task to the GCS. - // TODO(swang): Allow a pluggable policy for when to flush. - RAY_CHECK_OK(Flush()); + const TaskID task_id = task.GetTaskSpecification().TaskId(); + // Attempt to flush the task. + bool flushed = FlushTask(task_id); + if (!flushed) { + // If we fail to flush the task here, due to uncommitted parents, then add + // the task to a cache to be flushed in the future. + uncommitted_ready_tasks_.insert(task_id); + } } void LineageCache::RemoveWaitingTask(const TaskID &task_id) { From 117aa6675478735931b4c7788a96310c63e675f6 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Wed, 23 May 2018 15:58:00 -0700 Subject: [PATCH 5/6] Flush() returns void --- src/ray/raylet/lineage_cache.cc | 3 +-- src/ray/raylet/lineage_cache.h | 12 ++++++------ src/ray/raylet/lineage_cache_test.cc | 2 +- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 7c3e0bbc5b71..a6666501d996 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -283,7 +283,7 @@ bool LineageCache::FlushTask(const TaskID &task_id) { return all_arguments_committed; } -ray::Status LineageCache::Flush() { +void LineageCache::Flush() { // Iterate through all tasks that are READY. std::vector ready_task_ids; for (auto it = uncommitted_ready_tasks_.begin(); @@ -296,7 +296,6 @@ ray::Status LineageCache::Flush() { it++; } } - return ray::Status::OK(); } void PopAncestorTasks(const UniqueID &task_id, Lineage &lineage) { diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index cbca200a5fef..c4ccf92c289c 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -196,12 +196,12 @@ class LineageCache { /// includes the entry for the requested entry_id. Lineage GetUncommittedLineage(const TaskID &entry_id) const; - /// Asynchronously write any tasks that have been added since the last flush - /// to the GCS. When each write is acknowledged, its entry will be marked as - /// committed. - /// - /// \return Status. - Status Flush(); + /// Asynchronously write any tasks that are in the UNCOMMITTED_READY state + /// and for which all parents have been committed to the GCS. These tasks + /// will be transitioned in this method to state COMMITTING. Once the write + /// is acknowledged, the task's state will be transitioned to state + /// COMMITTED. + void Flush(); /// Handle the commit of a task entry in the GCS. This sets the task to /// COMMITTED and cleans up any ancestor tasks that are in the cache. diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 81b7be3a4080..12b24ba35263 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -184,7 +184,7 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineage) { void CheckFlush(LineageCache &lineage_cache, MockGcs &mock_gcs, size_t num_tasks_flushed) { - RAY_CHECK_OK(lineage_cache.Flush()); + lineage_cache.Flush(); ASSERT_EQ(mock_gcs.TaskTable().size(), num_tasks_flushed); } From 87283f8b5033a7edb536f3964ad1b6992776af32 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Mon, 28 May 2018 14:51:41 -0700 Subject: [PATCH 6/6] x --- src/ray/raylet/lineage_cache.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index a6666501d996..a5eb91a6b3b0 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -285,7 +285,6 @@ bool LineageCache::FlushTask(const TaskID &task_id) { void LineageCache::Flush() { // Iterate through all tasks that are READY. - std::vector ready_task_ids; for (auto it = uncommitted_ready_tasks_.begin(); it != uncommitted_ready_tasks_.end();) { bool flushed = FlushTask(*it); @@ -335,6 +334,8 @@ void LineageCache::HandleEntryCommitted(const UniqueID &task_id) { subscribed_tasks_.erase(it); } + // Try to flush the children of the committed task. These are the tasks that + // have a dependency on the committed task. auto children_entry = uncommitted_ready_children_.find(task_id); if (children_entry != uncommitted_ready_children_.end()) { // Get the children of the committed task that are uncommitted but ready.