Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions src/ray/raylet/lineage_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,21 @@ void LineageCache::AddReadyTask(const Task &task) {
}
}

uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id) const {
if (subscribed_tasks_.count(task_id) == 1) {
return 0;
}
auto entry = lineage_.GetEntry(task_id);
if (!entry) {
return 0;
}
uint64_t cnt = 1;
for (const auto &parent_id : entry->GetParentTaskIds()) {
cnt += CountUnsubscribedLineage(parent_id);
}
return cnt;
}

void LineageCache::RemoveWaitingTask(const TaskID &task_id) {
auto entry = lineage_.PopEntry(task_id);
// It's only okay to remove a task that is waiting for execution.
Expand All @@ -226,24 +241,18 @@ void LineageCache::RemoveWaitingTask(const TaskID &task_id) {
entry->ResetStatus(GcsStatus_UNCOMMITTED_REMOTE);
RAY_CHECK(lineage_.SetEntry(std::move(*entry)));

// Try to evict a task and its uncommitted lineage if the uncommitted lineage
// exceeds the maximum size.
// Request a notification for every max_lineage_size_ tasks,
// so that the task and its uncommitted lineage can be evicted
// once the commit notification is received.
// By doing this, we make sure that the unevicted lineage won't be more than
// max_lineage_size_, and the number of subscribed tasks won't be more than
// N / max_lineage_size_, where N is the size of the task chain.
// NOTE(swang): The number of entries in the uncommitted lineage also
// includes local tasks that haven't been committed yet, not just remote
// tasks, so this is an overestimate.
const auto uncommitted_lineage = GetUncommittedLineage(task_id);
if (uncommitted_lineage.GetEntries().size() > max_lineage_size_) {
// Request a notification for the newly remote task so that the task and
// its uncommitted lineage can be evicted once the commit notification is
// received. Since this task was in state WAITING, check that we were not
if (CountUnsubscribedLineage(task_id) > max_lineage_size_) {
// Since this task was in state WAITING, check that we were not
// already subscribed to the task.
// NOTE(swang): We may end up requesting notifications for too many tasks
// from the GCS if we do not receive a notification for this task fast
// enough, since every dependent and waiting task that gets removed
// afterwards will also have an uncommitted lineage that's too large. If
// this becomes an issue, we can be smarter about which tasks to request by
// either storing the dependency depth as part of the task specs, or
// storing that information as a data structure in the lineage cache.
RAY_CHECK(SubscribeTask(task_id));
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/lineage_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ class LineageCache {
/// Unsubscribe from notifications for a task. Returns whether the operation
/// was successful (whether we were subscribed).
bool UnsubscribeTask(const UniqueID &task_id);
/// Count the size of unsubscribed and uncommitted lineage
uint64_t CountUnsubscribedLineage(const UniqueID &task_id) const;

/// The client ID, used to request notifications for specific tasks.
/// TODO(swang): Move the ClientID into the generic Table implementation.
Expand Down
14 changes: 10 additions & 4 deletions src/ray/raylet/lineage_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class MockGcs : public gcs::TableInterface<TaskID, protocol::Task>,
if (task_table_.count(task_id) == 1) {
callbacks_.push_back({notification_callback_, task_id});
}
num_requested_notifications_ += 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the num_requested_notifications_ need set the initial value

return ray::Status::OK();
}

Expand All @@ -75,11 +76,14 @@ class MockGcs : public gcs::TableInterface<TaskID, protocol::Task>,

const std::unordered_set<TaskID> &SubscribedTasks() const { return subscribed_tasks_; }

const int NumRequestedNotifications() const { return num_requested_notifications_; }

private:
std::unordered_map<TaskID, std::shared_ptr<protocol::TaskT>> task_table_;
std::vector<std::pair<gcs::raylet::TaskTable::WriteCallback, TaskID>> callbacks_;
gcs::raylet::TaskTable::WriteCallback notification_callback_;
std::unordered_set<TaskID> subscribed_tasks_;
int num_requested_notifications_ = 0;
};

class LineageCacheTest : public ::testing::Test {
Expand Down Expand Up @@ -391,14 +395,14 @@ TEST_F(LineageCacheTest, TestEviction) {
// All tasks have now been flushed. Check that enough lineage has been
// evicted that the uncommitted lineage is now less than the maximum size.
uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id);
ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_);
ASSERT_TRUE(uncommitted_lineage.GetEntries().size() <= max_lineage_size_);
}

TEST_F(LineageCacheTest, TestOutOfOrderEviction) {
// Insert a chain of dependent tasks that is more than twice as long as the
// maximum lineage size. This will ensure that we request notifications for
// at least 2 remote tasks.
uint64_t lineage_size = (2 * max_lineage_size_) + 1;
// at most 2 remote tasks.
uint64_t lineage_size = (2 * max_lineage_size_) + 2;
size_t num_tasks_flushed = 0;
std::vector<Task> tasks;
InsertTaskChain(lineage_cache_, tasks, lineage_size, std::vector<ObjectID>(), 1);
Expand All @@ -408,6 +412,8 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) {
auto task_id = task.GetTaskSpecification().TaskId();
lineage_cache_.RemoveWaitingTask(task_id);
}
// Check that we requested at most 2 notifications
ASSERT_TRUE(mock_gcs_.NumRequestedNotifications() <= 2);

// Check that the last task in the chain still has all tasks in its
// uncommitted lineage.
Expand Down Expand Up @@ -441,7 +447,7 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) {
// All tasks have now been flushed. Check that enough lineage has been
// evicted that the uncommitted lineage is now less than the maximum size.
uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id);
ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_);
ASSERT_TRUE(uncommitted_lineage.GetEntries().size() <= max_lineage_size_);
}

} // namespace raylet
Expand Down