Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ install:
- ./src/ray/raylet/task_test
- ./src/ray/raylet/worker_pool_test
- ./src/ray/raylet/lineage_cache_test
- ./src/ray/raylet/task_dependency_manager_test

- bash ../../../src/common/test/run_tests.sh
- bash ../../../src/plasma/test/run_tests.sh
Expand Down
26 changes: 18 additions & 8 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,34 @@ ObjectDirectory::ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> gcs_client
ray::Status ObjectDirectory::ReportObjectAdded(const ObjectID &object_id,
const ClientID &client_id,
const ObjectInfoT &object_info) {
// TODO(hme): Determine whether we need to do lookup to append.
// Append the addition entry to the object table.
JobID job_id = JobID::from_random();
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
data->is_eviction = false;
data->num_evictions = object_evictions_[object_id];
data->object_size = object_info.data_size;
ray::Status status = gcs_client_->object_table().Append(
job_id, object_id, data, [](gcs::AsyncGcsClient *client, const UniqueID &id,
const std::shared_ptr<ObjectTableDataT> data) {
// Do nothing.
});
ray::Status status =
gcs_client_->object_table().Append(job_id, object_id, data, nullptr);
return status;
};

ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) {
// TODO(hme): Need corresponding remove method in GCS.
return ray::Status::NotImplemented("ObjectTable.Remove is not implemented");
// Append the eviction entry to the object table.
JobID job_id = JobID::from_random();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're no longer doing JobID::from_random() due to the cost of the random number generator used (as of #2044)

auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
data->is_eviction = true;
data->num_evictions = object_evictions_[object_id];
ray::Status status =
gcs_client_->object_table().Append(job_id, object_id, data, nullptr);
// Increment the number of times we've evicted this object. NOTE(swang): This
// is only necessary because the Ray redis module expects unique entries in a
// log. We track the number of evictions so that the next eviction, if there
// is one, is unique.
object_evictions_[object_id]++;
return status;
};

ray::Status ObjectDirectory::GetInformation(const ClientID &client_id,
Expand Down
3 changes: 3 additions & 0 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_map<ObjectID, ODCallbacks> existing_requests_;
/// Reference to the gcs client.
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
/// Map from object ID to the number of times it's been evicted on this
/// node before.
std::unordered_map<ObjectID, int> object_evictions_;
};

} // namespace ray
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ ADD_RAY_TEST(worker_pool_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} $

ADD_RAY_TEST(task_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY})
ADD_RAY_TEST(lineage_cache_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY})
ADD_RAY_TEST(task_dependency_manager_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY})

add_library(rayletlib raylet.cc ${NODE_MANAGER_FBS_OUTPUT_FILES})
target_link_libraries(rayletlib ray_static ${Boost_SYSTEM_LIBRARY})
Expand Down
148 changes: 127 additions & 21 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
local_queues_(SchedulingQueue()),
scheduling_policy_(local_queues_),
reconstruction_policy_([this](const TaskID &task_id) { ResubmitTask(task_id); }),
task_dependency_manager_(
object_manager,
// reconstruction_policy_,
[this](const TaskID &task_id) { HandleWaitingTaskReady(task_id); }),
task_dependency_manager_(),
lineage_cache_(gcs_client_->client_table().GetLocalClientId(),
gcs_client->raylet_task_table(), gcs_client->raylet_task_table()),
remote_clients_(),
Expand All @@ -88,6 +85,13 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
ClientID local_client_id = gcs_client_->client_table().GetLocalClientId();
cluster_resource_map_.emplace(local_client_id,
SchedulingResources(config.resource_config));

RAY_CHECK_OK(object_manager_.SubscribeObjAdded([this](const ObjectInfoT &object_info) {
ObjectID object_id = ObjectID::from_binary(object_info.object_id);
HandleObjectLocal(object_id);
}));
RAY_CHECK_OK(object_manager_.SubscribeObjDeleted(
[this](const ObjectID &object_id) { HandleObjectMissing(object_id); }));
}

ray::Status NodeManager::RegisterGcs() {
Expand Down Expand Up @@ -366,6 +370,9 @@ void NodeManager::ProcessClientMessage(std::shared_ptr<LocalClientConnection> cl
auto message = flatbuffers::GetRoot<protocol::ReconstructObject>(message_data);
ObjectID object_id = from_flatbuf(*message->object_id());
RAY_LOG(DEBUG) << "reconstructing object " << object_id;
// TODO(swang): Instead of calling Pull on the object directly, record the
// fact that the blocked task is dependent on this object_id in the task
// dependency manager.
RAY_CHECK_OK(object_manager_.Pull(object_id));

// If the blocked client is a worker, and the worker isn't already blocked,
Expand Down Expand Up @@ -461,13 +468,6 @@ void NodeManager::ProcessNodeManagerMessage(
node_manager_client->ProcessMessages();
}

void NodeManager::HandleWaitingTaskReady(const TaskID &task_id) {
auto ready_tasks = local_queues_.RemoveTasks({task_id});
local_queues_.QueueReadyTasks(std::vector<Task>(ready_tasks));
// Schedule the newly ready tasks if possible.
ScheduleTasks();
}

void NodeManager::ScheduleTasks() {
// This method performs the transition of tasks from PENDING to SCHEDULED.
auto policy_decision = scheduling_policy_.Schedule(
Expand Down Expand Up @@ -495,13 +495,30 @@ void NodeManager::ScheduleTasks() {
// TODO(swang): Handle forward task failure.
// TODO(swang): Unsubscribe this task in the task dependency manager.
RAY_CHECK_OK(ForwardTask(task, client_id));

// Tell the task dependency manager that we no longer need this task's
// object dependencies.
std::vector<ObjectID> objects_to_cancel;
task_dependency_manager_.UnsubscribeDependencies(task_id, objects_to_cancel);
for (const auto &object_id : objects_to_cancel) {
HandleRemoteDependencyCanceled(object_id);
}
// Tell the task dependency manager that we are no longer responsible for
// executing this task.
std::vector<ObjectID> objects_to_request;
task_dependency_manager_.TaskCanceled(task_id, objects_to_request);
for (const auto &object_id : objects_to_request) {
HandleRemoteDependencyRequired(object_id);
}
}
}

// Transition locally scheduled tasks to SCHEDULED and dispatch scheduled tasks.
std::vector<Task> tasks = local_queues_.RemoveTasks(local_task_ids);
local_queues_.QueueScheduledTasks(tasks);
DispatchTasks();
if (local_task_ids.size() > 0) {
std::vector<Task> tasks = local_queues_.RemoveTasks(local_task_ids);
local_queues_.QueueScheduledTasks(tasks);
DispatchTasks();
}
}

void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage) {
Expand Down Expand Up @@ -554,14 +571,48 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
}
}

void NodeManager::HandleRemoteDependencyRequired(const ObjectID &dependency_id) {
// Try to fetch the object from the object manager.
object_manager_.Pull(dependency_id);
// TODO(swang): Request reconstruction of the object, possibly after a
// timeout.
}

void NodeManager::HandleRemoteDependencyCanceled(const ObjectID &dependency_id) {
// Cancel the fetch request from the object manager.
object_manager_.Cancel(dependency_id);
// TODO(swang): Cancel reconstruction of the object.
}

void NodeManager::QueueTask(const Task &task) {
// Queue the task depending on the availability of its arguments.
if (task_dependency_manager_.TaskReady(task)) {
local_queues_.QueueReadyTasks(std::vector<Task>({task}));
// Mark the task as pending. Once the task has finished execution, or once it
// has been forwarded to another node, the task should be canceled in the
// TaskDependencyManager.
std::vector<ObjectID> objects_to_cancel;
task_dependency_manager_.TaskPending(task, objects_to_cancel);
// Cancel any remote dependencies that will be fulfilled by the queued task.
for (const auto &object_id : objects_to_cancel) {
HandleRemoteDependencyCanceled(object_id);
}

// Subscribe to the task's dependencies.
const auto dependencies = task.GetDependencies();
const auto task_id = task.GetTaskSpecification().TaskId();
std::vector<ObjectID> objects_to_request;
bool ready = task_dependency_manager_.SubscribeDependencies(task_id, dependencies,
objects_to_request);
// Request any remote dependencies required by the queued task.
for (const auto &object_id : objects_to_request) {
HandleRemoteDependencyRequired(object_id);
}
// Queue the task. If all dependencies are available, then the task is queued
// in the READY state, else the WAITING.
if (ready) {
local_queues_.QueueReadyTasks({task});
// Try to schedule the newly ready task.
ScheduleTasks();
} else {
local_queues_.QueueWaitingTasks(std::vector<Task>({task}));
task_dependency_manager_.SubscribeTaskReady(task);
local_queues_.QueueWaitingTasks({task});
}
}

Expand Down Expand Up @@ -626,6 +677,13 @@ void NodeManager::AssignTask(Task &task) {
}
// We started running the task, so the task is ready to write to GCS.
lineage_cache_.AddReadyTask(task);
// Notify the task dependency manager that we no longer need the
// dependencies for this task.
std::vector<ObjectID> objects_to_cancel;
task_dependency_manager_.UnsubscribeDependencies(spec.TaskId(), objects_to_cancel);
for (const auto &object_id : objects_to_cancel) {
HandleRemoteDependencyCanceled(object_id);
}
// Mark the task as running.
local_queues_.QueueRunningTasks(std::vector<Task>({task}));
} else {
Expand Down Expand Up @@ -680,7 +738,14 @@ void NodeManager::FinishAssignedTask(std::shared_ptr<Worker> worker) {
if (task.GetTaskSpecification().IsActorCreationTask() ||
task.GetTaskSpecification().IsActorTask()) {
auto dummy_object = task.GetTaskSpecification().ActorDummyObject();
task_dependency_manager_.MarkDependencyReady(dummy_object);
HandleObjectLocal(dummy_object);
}

// Notify the task dependency manager that this task has finished execution.
std::vector<ObjectID> objects_to_request;
task_dependency_manager_.TaskCanceled(task_id, objects_to_request);
for (const auto &object_id : objects_to_request) {
HandleRemoteDependencyRequired(object_id);
}

// Unset the worker's assigned task.
Expand All @@ -691,6 +756,48 @@ void NodeManager::ResubmitTask(const TaskID &task_id) {
throw std::runtime_error("Method not implemented");
}

void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
// Notify the task dependency manager that this object is local.
std::vector<ObjectID> objects_to_cancel;
const auto ready_task_ids =
task_dependency_manager_.HandleObjectLocal(object_id, objects_to_cancel);
// Transition the tasks whose dependencies are now fulfilled to the ready
// state.
if (ready_task_ids.size() > 0) {
std::unordered_set<TaskID> ready_task_id_set(ready_task_ids.begin(),
ready_task_ids.end());
auto ready_tasks = local_queues_.RemoveTasks(ready_task_id_set);
local_queues_.QueueReadyTasks(std::vector<Task>(ready_tasks));
// Schedule the newly ready tasks.
ScheduleTasks();
}
// Cancel remote dependencies.
for (const auto &object_id : objects_to_cancel) {
HandleRemoteDependencyCanceled(object_id);
}
}

void NodeManager::HandleObjectMissing(const ObjectID &object_id) {
// Notify the task dependency manager that this object is no longer local.
std::vector<ObjectID> objects_to_request;
const auto waiting_task_ids =
task_dependency_manager_.HandleObjectMissing(object_id, objects_to_request);
// Transition any tasks that were in the runnable state and are dependent on
// this object to the waiting state.
if (!waiting_task_ids.empty()) {
// Transition the tasks back to the waiting state. They will be made
// runnable once the deleted object becomes available again.
std::unordered_set<TaskID> waiting_task_id_set(waiting_task_ids.begin(),
waiting_task_ids.end());
auto waiting_tasks = local_queues_.RemoveTasks(waiting_task_id_set);
local_queues_.QueueWaitingTasks(std::vector<Task>(waiting_tasks));
}
// Request any objects that are now necessary for the waiting tasks.
for (const auto &object_id : objects_to_request) {
HandleRemoteDependencyRequired(object_id);
}
}

ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id) {
const auto &spec = task.GetTaskSpecification();
auto task_id = spec.TaskId();
Expand Down Expand Up @@ -726,7 +833,6 @@ ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id)
// lineage cache since the receiving node is now responsible for writing
// the task to the GCS.
lineage_cache_.RemoveWaitingTask(task_id);

// Preemptively push any local arguments to the receiving node. For now, we
// only do this with actor tasks, since actor tasks must be executed by a
// specific process and therefore have affinity to the receiving node.
Expand Down
43 changes: 29 additions & 14 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,16 @@ class NodeManager {
ray::Status RegisterGcs();

private:
// Handler for the addition of a new GCS client.
/// Methods for handling clients.
/// Handler for the addition of a new GCS client.
void ClientAdded(const ClientTableDataT &data);
// Handler for the creation of an actor, possibly on a remote node.
void HandleActorCreation(const ActorID &actor_id,
const std::vector<ActorTableDataT> &data);
/// Send heartbeats to the GCS.
void Heartbeat();
/// Handler for a heartbeat notification from the GCS.
void HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatTableDataT &data);

/// Methods for task scheduling.
// Queue a task for local execution.
void QueueTask(const Task &task);
/// Submit a task to this node.
Expand All @@ -72,25 +77,35 @@ class NodeManager {
void FinishAssignedTask(std::shared_ptr<Worker> worker);
/// Schedule tasks.
void ScheduleTasks();
/// Handle a task whose local dependencies were missing and are now available.
void HandleWaitingTaskReady(const TaskID &task_id);
/// Resubmit a task whose return value needs to be reconstructed.
void ResubmitTask(const TaskID &task_id);
/// Forward a task to another node to execute. The task is assumed to not be
/// queued in local_queues_.
ray::Status ForwardTask(const Task &task, const ClientID &node_id);
/// Send heartbeats to the GCS.
void Heartbeat();
/// Handler for a notification about a new client from the GCS.
void ClientAdded(gcs::AsyncGcsClient *client, const UniqueID &id,
const ClientTableDataT &data);
/// Handler for a heartbeat notification from the GCS.
void HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatTableDataT &data);
/// Dispatch locally scheduled tasks. This attempts the transition from "scheduled" to
/// "running" task state.
void DispatchTasks();

/// Methods for actor scheduling.
/// Handler for the creation of an actor, possibly on a remote node.
void HandleActorCreation(const ActorID &actor_id,
const std::vector<ActorTableDataT> &data);

/// Methods for managing object dependencies.
/// Handle a dependency required by a queued task that is missing locally.
/// The dependency is (1) on a remote node, (2) pending creation on a remote
/// node, or (3) missing from all nodes and requires reconstruction.
void HandleRemoteDependencyRequired(const ObjectID &dependency_id);
/// Handle a dependency that was previously required by a queued task that is
/// no longer required.
void HandleRemoteDependencyCanceled(const ObjectID &dependency_id);
/// Handle an object becoming local. This updates any local accounting, but
/// does not write to any global accounting in the GCS.
void HandleObjectLocal(const ObjectID &object_id);
/// Handle an object that is no longer local. This updates any local
/// accounting, but does not write to any global accounting in the GCS.
void HandleObjectMissing(const ObjectID &object_id);

boost::asio::io_service &io_service_;
ObjectManager &object_manager_;
/// A client connection to the GCS.
Expand Down
Loading