Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ install:
- ./src/ray/raylet/task_test
- ./src/ray/raylet/worker_pool_test
- ./src/ray/raylet/lineage_cache_test
- ./src/ray/raylet/task_dependency_manager_test

- bash ../../../src/common/test/run_tests.sh
- bash ../../../src/plasma/test/run_tests.sh
Expand Down
28 changes: 19 additions & 9 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,34 @@ ObjectDirectory::ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> gcs_client
ray::Status ObjectDirectory::ReportObjectAdded(const ObjectID &object_id,
const ClientID &client_id,
const ObjectInfoT &object_info) {
// TODO(hme): Determine whether we need to do lookup to append.
JobID job_id = JobID::from_random();
// Append the addition entry to the object table.
JobID job_id = JobID::nil();
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
data->is_eviction = false;
data->num_evictions = object_evictions_[object_id];
data->object_size = object_info.data_size;
ray::Status status = gcs_client_->object_table().Append(
job_id, object_id, data,
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ObjectTableDataT &data) {
// Do nothing.
});
ray::Status status =
gcs_client_->object_table().Append(job_id, object_id, data, nullptr);
return status;
};

ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) {
// TODO(hme): Need corresponding remove method in GCS.
return ray::Status::NotImplemented("ObjectTable.Remove is not implemented");
// Append the eviction entry to the object table.
JobID job_id = JobID::nil();
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
data->is_eviction = true;
data->num_evictions = object_evictions_[object_id];
ray::Status status =
gcs_client_->object_table().Append(job_id, object_id, data, nullptr);
// Increment the number of times we've evicted this object. NOTE(swang): This
// is only necessary because the Ray redis module expects unique entries in a
// log. We track the number of evictions so that the next eviction, if there
// is one, is unique.
object_evictions_[object_id]++;
return status;
};

ray::Status ObjectDirectory::GetInformation(const ClientID &client_id,
Expand Down
3 changes: 3 additions & 0 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_map<ObjectID, ODCallbacks> existing_requests_;
/// Reference to the gcs client.
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
/// Map from object ID to the number of times it's been evicted on this
/// node before.
std::unordered_map<ObjectID, int> object_evictions_;
};

} // namespace ray
Expand Down
9 changes: 8 additions & 1 deletion src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,15 @@ struct ObjectManagerConfig {
std::string store_socket_name;
};

class ObjectManagerInterface {
public:
virtual ray::Status Pull(const ObjectID &object_id) = 0;
virtual ray::Status Cancel(const ObjectID &object_id) = 0;
virtual ~ObjectManagerInterface(){};
};

// TODO(hme): Add success/failure callbacks for push and pull.
class ObjectManager {
class ObjectManager : public ObjectManagerInterface {
public:
/// Implicitly instantiates Ray implementation of ObjectDirectory.
///
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ ADD_RAY_TEST(worker_pool_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} $

ADD_RAY_TEST(task_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY})
ADD_RAY_TEST(lineage_cache_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY})
ADD_RAY_TEST(task_dependency_manager_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY})

add_library(rayletlib raylet.cc ${NODE_MANAGER_FBS_OUTPUT_FILES})
target_link_libraries(rayletlib ray_static ${Boost_SYSTEM_LIBRARY})
Expand Down
109 changes: 85 additions & 24 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
local_queues_(SchedulingQueue()),
scheduling_policy_(local_queues_),
reconstruction_policy_([this](const TaskID &task_id) { ResubmitTask(task_id); }),
task_dependency_manager_(
object_manager,
// reconstruction_policy_,
[this](const TaskID &task_id) { HandleWaitingTaskReady(task_id); }),
task_dependency_manager_(object_manager),
lineage_cache_(gcs_client_->client_table().GetLocalClientId(),
gcs_client->raylet_task_table(), gcs_client->raylet_task_table()),
remote_clients_(),
Expand All @@ -88,6 +85,13 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
ClientID local_client_id = gcs_client_->client_table().GetLocalClientId();
cluster_resource_map_.emplace(local_client_id,
SchedulingResources(config.resource_config));

RAY_CHECK_OK(object_manager_.SubscribeObjAdded([this](const ObjectInfoT &object_info) {
ObjectID object_id = ObjectID::from_binary(object_info.object_id);
HandleObjectLocal(object_id);
}));
RAY_CHECK_OK(object_manager_.SubscribeObjDeleted(
[this](const ObjectID &object_id) { HandleObjectMissing(object_id); }));
}

ray::Status NodeManager::RegisterGcs() {
Expand Down Expand Up @@ -374,6 +378,9 @@ void NodeManager::ProcessClientMessage(
auto message = flatbuffers::GetRoot<protocol::ReconstructObject>(message_data);
ObjectID object_id = from_flatbuf(*message->object_id());
RAY_LOG(DEBUG) << "reconstructing object " << object_id;
// TODO(swang): Instead of calling Pull on the object directly, record the
// fact that the blocked task is dependent on this object_id in the task
// dependency manager.
RAY_CHECK_OK(object_manager_.Pull(object_id));

// If the blocked client is a worker, and the worker isn't already blocked,
Expand Down Expand Up @@ -468,13 +475,6 @@ void NodeManager::ProcessNodeManagerMessage(TcpClientConnection &node_manager_cl
node_manager_client.ProcessMessages();
}

void NodeManager::HandleWaitingTaskReady(const TaskID &task_id) {
auto ready_tasks = local_queues_.RemoveTasks({task_id});
local_queues_.QueueReadyTasks(std::vector<Task>(ready_tasks));
// Schedule the newly ready tasks if possible.
ScheduleTasks();
}

void NodeManager::ScheduleTasks() {
// This method performs the transition of tasks from PENDING to SCHEDULED.
auto policy_decision = scheduling_policy_.Schedule(
Expand All @@ -500,23 +500,32 @@ void NodeManager::ScheduleTasks() {
RAY_CHECK(1 == tasks.size());
Task &task = tasks.front();
// TODO(swang): Handle forward task failure.
// TODO(swang): Unsubscribe this task in the task dependency manager.
RAY_CHECK_OK(ForwardTask(task, client_id));
}
// Notify the task dependency manager that we no longer need this task's
// object dependencies.
// NOTE(swang): For local tasks, the scheduled task's dependencies may get
// evicted before it can be assigned to a worker.
task_dependency_manager_.UnsubscribeDependencies(task_id);
}

// Transition locally scheduled tasks to SCHEDULED and dispatch scheduled tasks.
std::vector<Task> tasks = local_queues_.RemoveTasks(local_task_ids);
local_queues_.QueueScheduledTasks(tasks);
DispatchTasks();
if (local_task_ids.size() > 0) {
std::vector<Task> tasks = local_queues_.RemoveTasks(local_task_ids);
local_queues_.QueueScheduledTasks(tasks);
DispatchTasks();
}
}

void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage) {
const TaskSpecification &spec = task.GetTaskSpecification();

// Add the task and its uncommitted lineage to the lineage cache.
lineage_cache_.AddWaitingTask(task, uncommitted_lineage);
// Mark the task as pending. Once the task has finished execution, or once it
// has been forwarded to another node, the task must be marked as canceled in
// the TaskDependencyManager.
task_dependency_manager_.TaskPending(task);

const TaskSpecification &spec = task.GetTaskSpecification();
if (spec.IsActorTask()) {
// Check whether we know the location of the actor.
const auto actor_entry = actor_registry_.find(spec.ActorId());
Expand Down Expand Up @@ -561,14 +570,31 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
}
}

void NodeManager::HandleRemoteDependencyRequired(const ObjectID &dependency_id) {
// Try to fetch the object from the object manager.
RAY_CHECK_OK(object_manager_.Pull(dependency_id));
// TODO(swang): Request reconstruction of the object, possibly after a
// timeout.
}

void NodeManager::HandleRemoteDependencyCanceled(const ObjectID &dependency_id) {
// Cancel the fetch request from the object manager.
RAY_CHECK_OK(object_manager_.Cancel(dependency_id));
// TODO(swang): Cancel reconstruction of the object.
}

void NodeManager::QueueTask(const Task &task) {
// Queue the task depending on the availability of its arguments.
if (task_dependency_manager_.TaskReady(task)) {
local_queues_.QueueReadyTasks(std::vector<Task>({task}));
// Subscribe to the task's dependencies.
bool ready = task_dependency_manager_.SubscribeDependencies(
task.GetTaskSpecification().TaskId(), task.GetDependencies());
// Queue the task. If all dependencies are available, then the task is queued
// in the READY state, else the WAITING.
if (ready) {
local_queues_.QueueReadyTasks({task});
// Try to schedule the newly ready task.
ScheduleTasks();
} else {
local_queues_.QueueWaitingTasks(std::vector<Task>({task}));
task_dependency_manager_.SubscribeTaskReady(task);
local_queues_.QueueWaitingTasks({task});
}
}

Expand Down Expand Up @@ -687,9 +713,12 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
if (task.GetTaskSpecification().IsActorCreationTask() ||
task.GetTaskSpecification().IsActorTask()) {
auto dummy_object = task.GetTaskSpecification().ActorDummyObject();
task_dependency_manager_.MarkDependencyReady(dummy_object);
HandleObjectLocal(dummy_object);
}

// Notify the task dependency manager that this task has finished execution.
task_dependency_manager_.TaskCanceled(task_id);

// Unset the worker's assigned task.
worker.AssignTaskId(TaskID::nil());
}
Expand All @@ -698,6 +727,36 @@ void NodeManager::ResubmitTask(const TaskID &task_id) {
throw std::runtime_error("Method not implemented");
}

void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
// Notify the task dependency manager that this object is local.
const auto ready_task_ids = task_dependency_manager_.HandleObjectLocal(object_id);
// Transition the tasks whose dependencies are now fulfilled to the ready
// state.
if (ready_task_ids.size() > 0) {
std::unordered_set<TaskID> ready_task_id_set(ready_task_ids.begin(),
ready_task_ids.end());
auto ready_tasks = local_queues_.RemoveTasks(ready_task_id_set);
local_queues_.QueueReadyTasks(std::vector<Task>(ready_tasks));
// Schedule the newly ready tasks.
ScheduleTasks();
}
}

void NodeManager::HandleObjectMissing(const ObjectID &object_id) {
// Notify the task dependency manager that this object is no longer local.
const auto waiting_task_ids = task_dependency_manager_.HandleObjectMissing(object_id);
// Transition any tasks that were in the runnable state and are dependent on
// this object to the waiting state.
if (!waiting_task_ids.empty()) {
// Transition the tasks back to the waiting state. They will be made
// runnable once the deleted object becomes available again.
std::unordered_set<TaskID> waiting_task_id_set(waiting_task_ids.begin(),
waiting_task_ids.end());
auto waiting_tasks = local_queues_.RemoveTasks(waiting_task_id_set);
local_queues_.QueueWaitingTasks(std::vector<Task>(waiting_tasks));
}
}

ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id) {
const auto &spec = task.GetTaskSpecification();
auto task_id = spec.TaskId();
Expand Down Expand Up @@ -733,7 +792,9 @@ ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id)
// lineage cache since the receiving node is now responsible for writing
// the task to the GCS.
lineage_cache_.RemoveWaitingTask(task_id);

// Notify the task dependency manager that we are no longer responsible
// for executing this task.
task_dependency_manager_.TaskCanceled(task_id);
// Preemptively push any local arguments to the receiving node. For now, we
// only do this with actor tasks, since actor tasks must be executed by a
// specific process and therefore have affinity to the receiving node.
Expand Down
43 changes: 29 additions & 14 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,16 @@ class NodeManager {
ray::Status RegisterGcs();

private:
// Handler for the addition of a new GCS client.
/// Methods for handling clients.
/// Handler for the addition of a new GCS client.
void ClientAdded(const ClientTableDataT &data);
// Handler for the creation of an actor, possibly on a remote node.
void HandleActorCreation(const ActorID &actor_id,
const std::vector<ActorTableDataT> &data);
/// Send heartbeats to the GCS.
void Heartbeat();
/// Handler for a heartbeat notification from the GCS.
void HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatTableDataT &data);

/// Methods for task scheduling.
// Queue a task for local execution.
void QueueTask(const Task &task);
/// Submit a task to this node.
Expand All @@ -72,25 +77,35 @@ class NodeManager {
void FinishAssignedTask(Worker &worker);
/// Schedule tasks.
void ScheduleTasks();
/// Handle a task whose local dependencies were missing and are now available.
void HandleWaitingTaskReady(const TaskID &task_id);
/// Resubmit a task whose return value needs to be reconstructed.
void ResubmitTask(const TaskID &task_id);
/// Forward a task to another node to execute. The task is assumed to not be
/// queued in local_queues_.
ray::Status ForwardTask(const Task &task, const ClientID &node_id);
/// Send heartbeats to the GCS.
void Heartbeat();
/// Handler for a notification about a new client from the GCS.
void ClientAdded(gcs::AsyncGcsClient *client, const UniqueID &id,
const ClientTableDataT &data);
/// Handler for a heartbeat notification from the GCS.
void HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatTableDataT &data);
/// Dispatch locally scheduled tasks. This attempts the transition from "scheduled" to
/// "running" task state.
void DispatchTasks();

/// Methods for actor scheduling.
/// Handler for the creation of an actor, possibly on a remote node.
void HandleActorCreation(const ActorID &actor_id,
const std::vector<ActorTableDataT> &data);

/// Methods for managing object dependencies.
/// Handle a dependency required by a queued task that is missing locally.
/// The dependency is (1) on a remote node, (2) pending creation on a remote
/// node, or (3) missing from all nodes and requires reconstruction.
void HandleRemoteDependencyRequired(const ObjectID &dependency_id);
/// Handle a dependency that was previously required by a queued task that is
/// no longer required.
void HandleRemoteDependencyCanceled(const ObjectID &dependency_id);
/// Handle an object becoming local. This updates any local accounting, but
/// does not write to any global accounting in the GCS.
void HandleObjectLocal(const ObjectID &object_id);
/// Handle an object that is no longer local. This updates any local
/// accounting, but does not write to any global accounting in the GCS.
void HandleObjectMissing(const ObjectID &object_id);

boost::asio::io_service &io_service_;
ObjectManager &object_manager_;
/// A client connection to the GCS.
Expand Down
Loading