Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/common/state/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class RayConfig {

int64_t get_timeout_milliseconds() const { return get_timeout_milliseconds_; }

uint64_t max_lineage_size() const { return max_lineage_size_; }

int64_t worker_get_request_size() const { return worker_get_request_size_; }

int64_t worker_fetch_request_size() const {
Expand Down Expand Up @@ -112,6 +114,7 @@ class RayConfig {
get_timeout_milliseconds_(1000),
worker_get_request_size_(10000),
worker_fetch_request_size_(10000),
max_lineage_size_(100),
actor_max_dummy_objects_(1000),
num_connect_attempts_(50),
connect_timeout_milliseconds_(100),
Expand Down Expand Up @@ -160,6 +163,11 @@ class RayConfig {
int64_t worker_get_request_size_;
int64_t worker_fetch_request_size_;

/// This is used to bound the size of the Raylet's lineage cache. This is
/// the maximum uncommitted lineage size that any remote task in the cache
/// can have before eviction will be attempted.
uint64_t max_lineage_size_;

/// This is a temporary constant used by actors to determine how many dummy
/// objects to store.
int64_t actor_max_dummy_objects_;
Expand Down
136 changes: 103 additions & 33 deletions src/ray/raylet/lineage_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,12 @@ flatbuffers::Offset<protocol::ForwardTaskRequest> Lineage::ToFlatbuffer(

LineageCache::LineageCache(const ClientID &client_id,
gcs::TableInterface<TaskID, protocol::Task> &task_storage,
gcs::PubsubInterface<TaskID> &task_pubsub)
: client_id_(client_id), task_storage_(task_storage), task_pubsub_(task_pubsub) {}
gcs::PubsubInterface<TaskID> &task_pubsub,
uint64_t max_lineage_size)
: client_id_(client_id),
task_storage_(task_storage),
task_pubsub_(task_pubsub),
max_lineage_size_(max_lineage_size) {}

/// A helper function to merge one lineage into another, in DFS order.
///
Expand Down Expand Up @@ -178,16 +182,30 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l
return false;
});

// If the task was previously remote, then we may have been subscribed to
// it. Unsubscribe since we are now responsible for committing the task.
auto entry = lineage_.GetEntry(task_id);
if (entry) {
RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE);
UnsubscribeTask(task_id);
}

// Add the submitted task to the lineage cache as UNCOMMITTED_WAITING. It
// should be marked as UNCOMMITTED_READY once the task starts execution.
LineageEntry task_entry(task, GcsStatus_UNCOMMITTED_WAITING);
RAY_CHECK(lineage_.SetEntry(std::move(task_entry)));
}

void LineageCache::AddReadyTask(const Task &task) {
const TaskID task_id = task.GetTaskSpecification().TaskId();

// Tasks can only become READY if they were in WAITING.
auto entry = lineage_.GetEntry(task_id);
RAY_CHECK(entry);
RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_WAITING);

auto new_entry = LineageEntry(task, GcsStatus_UNCOMMITTED_READY);
RAY_CHECK(lineage_.SetEntry(std::move(new_entry)));
const TaskID task_id = task.GetTaskSpecification().TaskId();
// Attempt to flush the task.
bool flushed = FlushTask(task_id);
if (!flushed) {
Expand All @@ -207,6 +225,27 @@ void LineageCache::RemoveWaitingTask(const TaskID &task_id) {
// one.
entry->ResetStatus(GcsStatus_UNCOMMITTED_REMOTE);
RAY_CHECK(lineage_.SetEntry(std::move(*entry)));

// Try to evict a task and its uncommitted lineage if the uncommitted lineage
// exceeds the maximum size.
// NOTE(swang): The number of entries in the uncommitted lineage also
// includes local tasks that haven't been committed yet, not just remote
// tasks, so this is an overestimate.
const auto uncommitted_lineage = GetUncommittedLineage(task_id);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like an expensive operation, especially if we allow the uncommitted lineage to grow to size 1000. And this is called every time we forward a task, right? Will this be an issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be called every time we forward a task. It shouldn't be too expensive since it's just walking a tree, but I guess it's hard to say. Would it help if we added a timing statement to GetUncommittedLineage and logged a warning if it exceeds maybe 1ms?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to you about the timing statement, I assume it will be visible in the profiler if it's an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm okay, I think it's a good sanity check. I'll run this quickly locally and see how many tasks it takes to get to 1ms.

if (uncommitted_lineage.GetEntries().size() > max_lineage_size_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What goes wrong if max_lineage_size_ = 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think that will basically just try to evict every single entry in the lineage, but the code itself should not break. The tradeoff is basically the lower max_lineage_size_ is, the more often a node will request notifications from the GCS.

// Request a notification for the newly remote task so that the task and
// its uncommitted lineage can be evicted once the commit notification is
// received. Since this task was in state WAITING, check that we were not
// already subscribed to the task.
// NOTE(swang): We may end up requesting notifications for too many tasks
// from the GCS if we do not receive a notification for this task fast
// enough, since every dependent and waiting task that gets removed
// afterwards will also have an uncommitted lineage that's too large. If
// this becomes an issue, we can be smarter about which tasks to request by
// either storing the dependency depth as part of the task specs, or
// storing that information as a data structure in the lineage cache.
RAY_CHECK(SubscribeTask(task_id));
}
}

Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id) const {
Expand All @@ -216,7 +255,7 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id) const {
MergeLineageHelper(task_id, lineage_, uncommitted_lineage, [](GcsStatus status) {
// The stopping condition for recursion is that the entry has been
// committed to the GCS.
return status == GcsStatus_COMMITTED;
return false;
});
return uncommitted_lineage;
}
Expand All @@ -234,7 +273,7 @@ bool LineageCache::FlushTask(const TaskID &task_id) {
// If a parent entry exists in the lineage cache but has not been
// committed yet, then as far as we know, it's still in flight to the
// GCS. Skip this task for now.
if (parent && parent->GetStatus() != GcsStatus_COMMITTED) {
if (parent) {
RAY_CHECK(parent->GetStatus() != GcsStatus_UNCOMMITTED_WAITING)
<< "Children should not become ready to flush before their parents.";
// Request notifications about the parent entry's commit in the GCS if
Expand All @@ -243,13 +282,7 @@ bool LineageCache::FlushTask(const TaskID &task_id) {
// notification about the task's commit via HandleEntryCommitted, then
// this task will be ready to write on the next call to Flush().
if (parent->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE) {
auto inserted = subscribed_tasks_.insert(parent_id);
if (inserted.second) {
// Only request notifications about the parent entry if we haven't
// already requested notifications for it.
RAY_CHECK_OK(
task_pubsub_.RequestNotifications(JobID::nil(), parent_id, client_id_));
}
SubscribeTask(parent_id);
}
all_arguments_committed = false;
// Track the fact that this task is dependent on a parent that hasn't yet
Expand Down Expand Up @@ -297,42 +330,79 @@ void LineageCache::Flush() {
}
}

void PopAncestorTasks(const UniqueID &task_id, Lineage &lineage) {
auto entry = lineage.PopEntry(task_id);
bool LineageCache::SubscribeTask(const UniqueID &task_id) {
auto inserted = subscribed_tasks_.insert(task_id);
bool unsubscribed = inserted.second;
if (unsubscribed) {
// Request notifications for the task if we haven't already requested
// notifications for it.
RAY_CHECK_OK(task_pubsub_.RequestNotifications(JobID::nil(), task_id, client_id_));
}
// Return whether we were previously unsubscribed to this task and are now
// subscribed.
return unsubscribed;
}

bool LineageCache::UnsubscribeTask(const UniqueID &task_id) {
auto it = subscribed_tasks_.find(task_id);
bool subscribed = (it != subscribed_tasks_.end());
if (subscribed) {
// Cancel notifications for the task if we previously requested
// notifications for it.
RAY_CHECK_OK(task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_));
subscribed_tasks_.erase(it);
}
// Return whether we were previously subscribed to this task and are now
// unsubscribed.
return subscribed;
}

void LineageCache::EvictRemoteLineage(const UniqueID &task_id) {
// Remove the ancestor task.
auto entry = lineage_.PopEntry(task_id);
if (!entry) {
return;
}
// Tasks are committed in data dependency order per node, so the only
// ancestors of a committed task should be other remote tasks.
auto status = entry->GetStatus();
RAY_CHECK(status == GcsStatus_UNCOMMITTED_REMOTE || status == GcsStatus_COMMITTED);
RAY_CHECK(status == GcsStatus_UNCOMMITTED_REMOTE);
// We are evicting the remote ancestors of a task, so there should not be
// any dependent tasks that need to be flushed.
RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0);
// Unsubscribe from the remote ancestor task if we were subscribed to
// notifications.
UnsubscribeTask(task_id);
// Recurse and remove this task's ancestors.
for (const auto &parent_id : entry->GetParentTaskIds()) {
PopAncestorTasks(parent_id, lineage);
EvictRemoteLineage(parent_id);
}
}

void LineageCache::HandleEntryCommitted(const UniqueID &task_id) {
RAY_LOG(DEBUG) << "task committed: " << task_id;
auto entry = lineage_.PopEntry(task_id);
RAY_CHECK(entry);
for (const auto &parent_id : entry->GetParentTaskIds()) {
PopAncestorTasks(parent_id, lineage_);
if (!entry) {
// The committed entry has already been evicted. Check that the committed
// entry does not have any dependent tasks, since we should've already
// attempted to flush these tasks on the first commit notification.
RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0);
// Check that we already unsubscribed from the task when handling the
// first commit notification.
RAY_CHECK(subscribed_tasks_.count(task_id) == 0);
// Do nothing if the committed entry has already been evicted.
return;
}
// Mark this task as COMMITTED. Any tasks that were dependent on it and are
// ready to be written may now be flushed to the GCS.
bool committed = entry->SetStatus(GcsStatus_COMMITTED);
if (!committed) {
// If we failed to mark the task as committed, check that it's because it
// was committed before. This means that we already received a notification
// about the commit.
RAY_CHECK(entry->GetStatus() == GcsStatus_COMMITTED);

// Evict the committed task's uncommitted lineage. Since local tasks are
// written in data dependency order, the uncommitted lineage should only
// include remote tasks, i.e. tasks that were committed by a different node.
for (const auto &parent_id : entry->GetParentTaskIds()) {
EvictRemoteLineage(parent_id);
}
RAY_CHECK(lineage_.SetEntry(std::move(*entry)));

// Stop listening for notifications about this task.
auto it = subscribed_tasks_.find(task_id);
if (it != subscribed_tasks_.end()) {
RAY_CHECK_OK(task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_));
subscribed_tasks_.erase(it);
}
UnsubscribeTask(task_id);

// Try to flush the children of the committed task. These are the tasks that
// have a dependency on the committed task.
Expand Down
19 changes: 15 additions & 4 deletions src/ray/raylet/lineage_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ enum GcsStatus {
/// The task has been written to the GCS and we are waiting for an
/// acknowledgement of the commit.
GcsStatus_COMMITTING,
/// The task has been committed in the GCS. It's safe to remove this entry
/// from the lineage cache.
GcsStatus_COMMITTED,
};

/// \class LineageEntry
Expand Down Expand Up @@ -164,7 +161,7 @@ class LineageCache {
/// TODO(swang): Pass in the policy (interface?).
LineageCache(const ClientID &client_id,
gcs::TableInterface<TaskID, protocol::Task> &task_storage,
gcs::PubsubInterface<TaskID> &task_pubsub);
gcs::PubsubInterface<TaskID> &task_pubsub, uint64_t max_lineage_size);

/// Add a task that is waiting for execution and its uncommitted lineage.
/// These entries will not be written to the GCS until set to ready.
Expand Down Expand Up @@ -214,6 +211,15 @@ class LineageCache {
/// parents that are not committed yet, then the child will be flushed once
/// the parents have been committed.
bool FlushTask(const TaskID &task_id);
/// Evict a remote task and its lineage. This should only be called if we
/// are sure that the remote task and its lineage are committed.
void EvictRemoteLineage(const UniqueID &task_id);
/// Subscribe to notifications for a task. Returns whether the operation
/// was successful (whether we were not already subscribed).
bool SubscribeTask(const UniqueID &task_id);
/// Unsubscribe from notifications for a task. Returns whether the operation
/// was successful (whether we were subscribed).
bool UnsubscribeTask(const UniqueID &task_id);

/// The client ID, used to request notifications for specific tasks.
/// TODO(swang): Move the ClientID into the generic Table implementation.
Expand All @@ -223,6 +229,11 @@ class LineageCache {
/// The pubsub storage system for task information. This can be used to
/// request notifications for the commit of a task entry.
gcs::PubsubInterface<TaskID> &task_pubsub_;
/// The maximum size that a remote task's uncommitted lineage can get to. If
/// a remote task's uncommitted lineage exceeds this size, then a
/// notification will be requested from the pubsub storage system so that
/// the task and its lineage can be evicted from the stash.
uint64_t max_lineage_size_;
/// The set of tasks that are in UNCOMMITTED_READY state. This is a cache of
/// the tasks that may be flushable.
// TODO(swang): As an optimization, we may also want to further distinguish
Expand Down
Loading