Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 83 additions & 58 deletions src/ray/raylet/lineage_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,14 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l
void LineageCache::AddReadyTask(const Task &task) {
auto new_entry = LineageEntry(task, GcsStatus_UNCOMMITTED_READY);
RAY_CHECK(lineage_.SetEntry(std::move(new_entry)));
// Add the task to the cache of tasks that may be flushed.
uncommitted_ready_tasks_.insert(task.GetTaskSpecification().TaskId());

// Try to flush the task to the GCS.
// TODO(swang): Allow a pluggable policy for when to flush.
RAY_CHECK_OK(Flush());
const TaskID task_id = task.GetTaskSpecification().TaskId();
// Attempt to flush the task.
bool flushed = FlushTask(task_id);
if (!flushed) {
// If we fail to flush the task here, due to uncommitted parents, then add
// the task to a cache to be flushed in the future.
uncommitted_ready_tasks_.insert(task_id);
}
}

void LineageCache::RemoveWaitingTask(const TaskID &task_id) {
Expand All @@ -219,57 +221,49 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id) const {
return uncommitted_lineage;
}

Status LineageCache::Flush() {
// Iterate through all tasks that are READY.
std::vector<TaskID> ready_task_ids;
for (const auto &task_id : uncommitted_ready_tasks_) {
auto entry = lineage_.GetEntry(task_id);
RAY_CHECK(entry);
RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_READY);

// Check if all arguments have been committed to the GCS before writing
// this task.
bool all_arguments_committed = true;
for (const auto &parent_id : entry->GetParentTaskIds()) {
auto parent = lineage_.GetEntry(parent_id);
// If a parent entry exists in the lineage cache but has not been
// committed yet, then as far as we know, it's still in flight to the
// GCS. Skip this task for now.
if (parent && parent->GetStatus() != GcsStatus_COMMITTED) {
RAY_CHECK(parent->GetStatus() != GcsStatus_UNCOMMITTED_WAITING)
<< "Children should not become ready to flush before their parents.";
// Request notifications about the parent entry's commit in the GCS if
// the parent is remote. Otherwise, the parent is local and will
// eventually be flushed. In either case, once we receive a
// notification about the task's commit via HandleEntryCommitted, then
// this task will be ready to write on the next call to Flush().
if (parent->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE) {
auto inserted = subscribed_tasks_.insert(parent_id);
if (inserted.second) {
// Only request notifications about the parent entry if we haven't
// already requested notifications for it.
RAY_CHECK_OK(
task_pubsub_.RequestNotifications(JobID::nil(), parent_id, client_id_));
}
bool LineageCache::FlushTask(const TaskID &task_id) {
auto entry = lineage_.GetEntry(task_id);
RAY_CHECK(entry);
RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_READY);

// Check if all arguments have been committed to the GCS before writing
// this task.
bool all_arguments_committed = true;
for (const auto &parent_id : entry->GetParentTaskIds()) {
auto parent = lineage_.GetEntry(parent_id);
// If a parent entry exists in the lineage cache but has not been
// committed yet, then as far as we know, it's still in flight to the
// GCS. Skip this task for now.
if (parent && parent->GetStatus() != GcsStatus_COMMITTED) {
RAY_CHECK(parent->GetStatus() != GcsStatus_UNCOMMITTED_WAITING)
<< "Children should not become ready to flush before their parents.";
// Request notifications about the parent entry's commit in the GCS if
// the parent is remote. Otherwise, the parent is local and will
// eventually be flushed. In either case, once we receive a
// notification about the task's commit via HandleEntryCommitted, then
// this task will be ready to write on the next call to Flush().
if (parent->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE) {
auto inserted = subscribed_tasks_.insert(parent_id);
if (inserted.second) {
// Only request notifications about the parent entry if we haven't
// already requested notifications for it.
RAY_CHECK_OK(
task_pubsub_.RequestNotifications(JobID::nil(), parent_id, client_id_));
}
all_arguments_committed = false;
break;
}
}
if (all_arguments_committed) {
// All arguments have been committed to the GCS. Add this task to the
// list of tasks to write back to the GCS.
ready_task_ids.push_back(task_id);
all_arguments_committed = false;
// Track the fact that this task is dependent on a parent that hasn't yet
// been committed, for fast lookup. Once all parents are committed, the
// child will be flushed.
uncommitted_ready_children_[parent_id].insert(task_id);
}
}

// Write back all ready tasks whose arguments have been committed to the GCS.
gcs::raylet::TaskTable::WriteCallback task_callback = [this](
ray::gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) {
HandleEntryCommitted(id);
};
for (const auto &ready_task_id : ready_task_ids) {
auto task = lineage_.GetEntry(ready_task_id);
if (all_arguments_committed) {
gcs::raylet::TaskTable::WriteCallback task_callback = [this](
ray::gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) {
HandleEntryCommitted(id);
};
auto task = lineage_.GetEntry(task_id);
// TODO(swang): Make this better...
flatbuffers::FlatBufferBuilder fbb;
auto message = task->TaskData().ToFlatbuffer(fbb);
Expand All @@ -278,18 +272,29 @@ Status LineageCache::Flush() {
auto root = flatbuffers::GetRoot<protocol::Task>(fbb.GetBufferPointer());
root->UnPackTo(task_data.get());
RAY_CHECK_OK(task_storage_.Add(task->TaskData().GetTaskSpecification().DriverId(),
ready_task_id, task_data, task_callback));
task_id, task_data, task_callback));

// We successfully wrote the task, so mark it as committing.
// TODO(swang): Use a batched interface and write with all object entries.
auto entry = lineage_.PopEntry(ready_task_id);
auto entry = lineage_.PopEntry(task_id);
RAY_CHECK(entry->SetStatus(GcsStatus_COMMITTING));
RAY_CHECK(lineage_.SetEntry(std::move(*entry)));
// Erase the task from the cache of uncommitted ready tasks.
uncommitted_ready_tasks_.erase(ready_task_id);
}
return all_arguments_committed;
}

return ray::Status::OK();
void LineageCache::Flush() {
// Iterate through all tasks that are READY.
for (auto it = uncommitted_ready_tasks_.begin();
it != uncommitted_ready_tasks_.end();) {
bool flushed = FlushTask(*it);
// Erase the task from the cache of uncommitted ready tasks.
if (flushed) {
it = uncommitted_ready_tasks_.erase(it);
} else {
it++;
}
}
}

void PopAncestorTasks(const UniqueID &task_id, Lineage &lineage) {
Expand Down Expand Up @@ -328,6 +333,26 @@ void LineageCache::HandleEntryCommitted(const UniqueID &task_id) {
RAY_CHECK_OK(task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_));
subscribed_tasks_.erase(it);
}

// Try to flush the children of the committed task. These are the tasks that
// have a dependency on the committed task.
auto children_entry = uncommitted_ready_children_.find(task_id);
if (children_entry != uncommitted_ready_children_.end()) {
// Get the children of the committed task that are uncommitted but ready.
auto children = std::move(children_entry->second);
uncommitted_ready_children_.erase(children_entry);

// Try to flush the children. If all of the child's parents are committed,
// then the child will be flushed here.
for (const auto &child_id : children) {
bool flushed = FlushTask(child_id);
// Erase the child task from the cache of uncommitted ready tasks.
if (flushed) {
auto erased = uncommitted_ready_tasks_.erase(child_id);
RAY_CHECK(erased == 1);
}
}
}
}

} // namespace raylet
Expand Down
22 changes: 16 additions & 6 deletions src/ray/raylet/lineage_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,12 @@ class LineageCache {
/// includes the entry for the requested entry_id.
Lineage GetUncommittedLineage(const TaskID &entry_id) const;

/// Asynchronously write any tasks that have been added since the last flush
/// to the GCS. When each write is acknowledged, its entry will be marked as
/// committed.
///
/// \return Status.
Status Flush();
/// Asynchronously write any tasks that are in the UNCOMMITTED_READY state
/// and for which all parents have been committed to the GCS. These tasks
/// will be transitioned in this method to state COMMITTING. Once the write
/// is acknowledged, the task's state will be transitioned to state
/// COMMITTED.
void Flush();

/// Handle the commit of a task entry in the GCS. This sets the task to
/// COMMITTED and cleans up any ancestor tasks that are in the cache.
Expand All @@ -210,6 +210,11 @@ class LineageCache {
void HandleEntryCommitted(const TaskID &task_id);

private:
/// Try to flush a task that is in UNCOMMITTED_READY state. If the task has
/// parents that are not committed yet, then the child will be flushed once
/// the parents have been committed.
bool FlushTask(const TaskID &task_id);

/// The client ID, used to request notifications for specific tasks.
/// TODO(swang): Move the ClientID into the generic Table implementation.
ClientID client_id_;
Expand All @@ -225,6 +230,11 @@ class LineageCache {
// UNCOMMITTED_READY, but that have dependencies that have not been committed
// yet.
std::unordered_set<TaskID> uncommitted_ready_tasks_;
/// A mapping from each task that hasn't been committed yet, to all dependent
/// children tasks that are in UNCOMMITTED_READY state. This is used when the
/// parent task is committed, for fast lookup of children that may now be
/// flushed.
std::unordered_map<TaskID, std::unordered_set<TaskID>> uncommitted_ready_children_;
/// All tasks and objects that we are responsible for writing back to the
/// GCS, and the tasks and objects in their lineage.
Lineage lineage_;
Expand Down
20 changes: 11 additions & 9 deletions src/ray/raylet/lineage_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ class MockGcs : public gcs::TableInterface<TaskID, protocol::Task>,
}

void Flush() {
for (const auto &callback : callbacks_) {
auto callbacks = std::move(callbacks_);
callbacks_.clear();
for (const auto &callback : callbacks) {
callback.first(NULL, callback.second, *task_table_[callback.second]);
}
callbacks_.clear();
}

const std::unordered_map<TaskID, std::shared_ptr<protocol::TaskT>> &TaskTable() const {
Expand Down Expand Up @@ -183,7 +184,7 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineage) {

void CheckFlush(LineageCache &lineage_cache, MockGcs &mock_gcs,
size_t num_tasks_flushed) {
RAY_CHECK_OK(lineage_cache.Flush());
lineage_cache.Flush();
ASSERT_EQ(mock_gcs.TaskTable().size(), num_tasks_flushed);
}

Expand Down Expand Up @@ -219,15 +220,16 @@ TEST_F(LineageCacheTest, TestWritebackOrder) {
auto return_values1 =
InsertTaskChain(lineage_cache_, tasks, 3, std::vector<ObjectID>(), 1);

// Mark all tasks as ready.
// Mark all tasks as ready. The first task, which has no dependencies, should
// be flushed.
for (const auto &task : tasks) {
lineage_cache_.AddReadyTask(task);
}
// Check that we write back the tasks in order of data dependencies.
for (size_t i = 0; i < tasks.size(); i++) {
num_tasks_flushed++;
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
// Flush acknowledgements. The next task should be able to be written.
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
// Flush acknowledgements. The next task should have been flushed.
mock_gcs_.Flush();
}
}
Expand Down Expand Up @@ -268,11 +270,11 @@ TEST_F(LineageCacheTest, TestWritebackPartiallyReady) {
// tasks are received.
num_tasks_flushed++;
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
// Flush acknowledgements. The dependent task should now be able to be
// written.
// Flush acknowledgements. Both independent tasks should now be committed.
mock_gcs_.Flush();
// The dependent task should now be flushed.
num_tasks_flushed++;
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
}

TEST_F(LineageCacheTest, TestForwardTaskRoundTrip) {
Expand Down