Skip to content

Conversation

@stephanie-wang
Copy link
Contributor

What do these changes do?

This adds more sophisticated object dependency management for queued tasks in a raylet. The raylet::TaskDependencyManager determines which of the objects required by a local task must be fetched from a remote node or reconstructed. Previously, the logic was not very intelligent. This PR accounts for locally queued tasks, so that an object created by a queued task is not considered to be remote. This also adds unit tests for the raylet::TaskDependencyManager.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5330/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5340/
Test FAILed.

// TODO(hme): Need corresponding remove method in GCS.
return ray::Status::NotImplemented("ObjectTable.Remove is not implemented");
// Append the eviction entry to the object table.
JobID job_id = JobID::from_random();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're no longer doing JobID::from_random() due to the cost of the random number generator used (as of #2044)

// Subscribe to each of the tasks' arguments.
std::vector<ObjectID> null;
auto arguments = task.GetDependencies();
(void)task_dependency_manager_.SubscribeDependencies(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid c-style casts

auto tasks = MakeTaskChain(num_tasks, {}, 1);
for (const auto &task : tasks) {
// Subscribe to each of the tasks' arguments.
std::vector<ObjectID> null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was pretty confused by this variable name, can we use empty_dependencies or something like that?

// available.
for (size_t i = 0; i < arguments.size(); i++) {
std::vector<TaskID> ready_tasks;
std::vector<ObjectID> null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a different variable name

// again.
for (size_t i = 0; i < arguments.size(); i++) {
std::vector<TaskID> ready_tasks;
std::vector<ObjectID> null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

different variable name

// dependencies are fulfilled. This will track this task's dependencies until
// UnsubscribeDependencies is called on the same task ID. If any dependencies
// are missing, then when the last missing dependency later appears locally
// via a call to HandleObjectLocal, the subscribed task will be returned.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure what "the subscribed task will be returned" means. Returned by what?

void SubscribeTaskReady(const Task &task);
// Subscribe to object depedencies required by the task and check whether all
// dependencies are fulfilled. This will track this task's dependencies until
// UnsubscribeDependencies is called on the same task ID. If any dependencies
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is UnsubscribeDependencies always required? What about in the case where object dependencies are already met (i.e., SubscribeDependencies returns true)?

Also, there is no case in which UnsubscribeDependencies gets called automatically under the hood, right? The caller always has to call it manually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where the object dependencies are already met, the task could later have a dependency missing if there's an eviction.

Yeah, I was thinking was that we should just always have the caller explicitly unsubscribe once it's safe to do so.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5402/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5403/
Test PASSed.

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No interesting comments yet, still looking through.

}
for (size_t i = 0; i < arguments.size(); i++) {
std::vector<TaskID> ready_tasks;
std::vector<ObjectID> null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a different variable name. Is this used?

}
for (size_t i = 0; i < arguments.size(); i++) {
std::vector<TaskID> ready_tasks;
std::vector<ObjectID> null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a different variable name. Actually, is this even used?


using ::testing::_;

class MockObjectManager : public ObjectManagerInterface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love these tests, these look great!

/// mapping from task ID to information about the objects that the task
/// creates, either by return value or by `ray.put`. For each object, we
/// store the IDs of the subscribed tasks that are dependent on the object.
std::unordered_map<ray::TaskID, ObjectDependencyMap> required_tasks_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a little while to understand this data structure.

For a given TaskID, the keys of the ObjectDependencyMap are the ObjectIDs produced by that task, right?

How does this compare to just having a single ObjectDependencyMap that keeps track of all tasks that depend on given ObjectIDs?

I guess we would still need a way to iterate over all the relevant ObjectIDs produced by a given task, right? Presumably the hard part of this is knowing which IDs created by put are used?

Is that right? Or am I misunderstanding this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not super happy with this data structure either...

I think you understand it correctly. The reason for having the nested map is, as you said, so that we can iterate over the ObjectIDs produced by a given task. It's really just an optimization for the code in TaskPending and TaskCanceled, so that we can request/cancel objects produced by a task if that task becomes pending or canceled locally. Otherwise, it would've been okay to just have a single ObjectDependencyMap.

The alternative is to have a single ObjectDependencyMap and then a separate map from task ID to object IDs produced by the task. I didn't really like having to update two maps, though. Undecided on which one ends up being cleaner in the end; do you have a preference?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, why don't we try it this way and revisit it if it becomes an issue.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5449/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5454/
Test PASSed.

@robertnishihara
Copy link
Collaborator

tests passed on private travis

@robertnishihara robertnishihara merged commit 6ca122f into ray-project:master May 18, 2018
@robertnishihara robertnishihara deleted the xray-task-dependencies branch May 18, 2018 00:18
@robertnishihara robertnishihara mentioned this pull request May 18, 2018
alok added a commit to alok/ray that referenced this pull request May 18, 2018
* master: (22 commits)
  [xray] Fix bug in updating actor execution dependencies (ray-project#2064)
  [DataFrame] Refactor __delitem__ (ray-project#2080)
  [xray] Better error messaging when pulling from self. (ray-project#2068)
  Use source code in hash where possible (fix ray-project#2089) (ray-project#2090)
  Functions for flushing done tasks and evicted objects. (ray-project#2033)
  Fix compilation error for RAY_USE_NEW_GCS with latest clang. (ray-project#2086)
  [xray] Corrects Error Handling During Push and Pull. (ray-project#2059)
  [xray] Sophisticated task dependency management (ray-project#2035)
  Support calling positional arguments by keyword (fix ray-project#998) (ray-project#2081)
  [DataFrame] Improve performance of iteration methods (ray-project#2026)
  [DataFrame] Implement to_csv (ray-project#2014)
  [xray] Lineage cache only requests notifications about remote parent tasks (ray-project#2066)
  [rllib] Add magic methods for rollouts (ray-project#2024)
  [DataFrame] Allows DataFrame constructor to take in another DataFrame (ray-project#2072)
  Pin Pandas version for Travis to 0.22 (ray-project#2075)
  Fix python linting (ray-project#2076)
  [xray] Fix GCS table prefixes (ray-project#2065)
  Some tests for _submit API. (ray-project#2062)
  [rllib] Queue lib for python 2.7 (ray-project#2057)
  [autoscaler] Remove faulty assert that breaks during downscaling, pull configs from env (ray-project#2006)
  ...
alok added a commit to alok/ray that referenced this pull request May 21, 2018
* master: (24 commits)
  Performance fix (ray-project#2110)
  Use flake8-comprehensions (ray-project#1976)
  Improve error message printing and suppression. (ray-project#2104)
  [rllib] [doc] Broken link in ddpg doc
  YAPF, take 3 (ray-project#2098)
  [rllib] rename async -> _async (ray-project#2097)
  fix unused lambda capture (ray-project#2102)
  [xray] Use pubsub instead of timeout for ObjectManager Pull. (ray-project#2079)
  [DataFrame] Update _inherit_docstrings (ray-project#2085)
  [JavaWorker] Changes to the build system for support java worker (ray-project#2092)
  [xray] Fix bug in updating actor execution dependencies (ray-project#2064)
  [DataFrame] Refactor __delitem__ (ray-project#2080)
  [xray] Better error messaging when pulling from self. (ray-project#2068)
  Use source code in hash where possible (fix ray-project#2089) (ray-project#2090)
  Functions for flushing done tasks and evicted objects. (ray-project#2033)
  Fix compilation error for RAY_USE_NEW_GCS with latest clang. (ray-project#2086)
  [xray] Corrects Error Handling During Push and Pull. (ray-project#2059)
  [xray] Sophisticated task dependency management (ray-project#2035)
  Support calling positional arguments by keyword (fix ray-project#998) (ray-project#2081)
  [DataFrame] Improve performance of iteration methods (ray-project#2026)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants