Skip to content

Commit 00cfdef

Browse files
authored
Memory scheduling 2 DFS with Priority (ray-project#3)
Implemented basic DFS priority assignment and task blocking and preemption. Currently segfaults due to object already existing during task retry.
1 parent 6562a1c commit 00cfdef

26 files changed

+401
-15279
lines changed

benchmarks/single_node/test_single_node.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
def assert_no_leaks():
2020
total = ray.cluster_resources()
2121
current = ray.available_resources()
22+
print(total, current)
2223
total.pop("memory")
2324
total.pop("object_store_memory")
2425
current.pop("memory")
@@ -137,7 +138,8 @@ def test_large_object():
137138
assert big_obj[-1] == 0
138139

139140

140-
ray.init(address="auto")
141+
#ray.init(address="auto")
142+
ray.init()
141143

142144
args_start = perf_counter()
143145
test_many_args()

cpp/src/ray/runtime/task/local_mode_task_submitter.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
4949
local_mode_ray_tuntime_.GetCurrentTaskId(), 0,
5050
local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1,
5151
required_resources, required_placement_resources,
52-
std::make_pair(PlacementGroupID::Nil(), -1), true, "");
52+
std::make_pair(PlacementGroupID::Nil(), -1), true, "", Priority());
5353
if (invocation.task_type == TaskType::NORMAL_TASK) {
5454
} else if (invocation.task_type == TaskType::ACTOR_CREATION_TASK) {
5555
invocation.actor_id = local_mode_ray_tuntime_.GetNextActorID();

dashboard/client/package-lock.json

Lines changed: 0 additions & 15252 deletions
This file was deleted.
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import ray
2+
import time
3+
import sys
4+
import argparse
5+
import csv
6+
import numpy as np
7+
from time import perf_counter
8+
from time import perf_counter
9+
10+
####################
11+
## Argument Parse ##
12+
####################
13+
parser = argparse.ArgumentParser()
14+
parser.add_argument('--WORKING_SET_RATIO', '-w', type=int, default=2)
15+
parser.add_argument('--OBJECT_STORE_SIZE', '-o', type=int, default=1_000_000_000)
16+
parser.add_argument('--OBJECT_SIZE', '-os', type=int, default=200_000_000)
17+
parser.add_argument('--NUM_STAGES', '-ns', type=int, default=1)
18+
args = parser.parse_args()
19+
params = vars(args)
20+
21+
OBJECT_STORE_SIZE = params['OBJECT_STORE_SIZE']
22+
OBJECT_SIZE = params['OBJECT_SIZE']
23+
WORKING_SET_RATIO = params['WORKING_SET_RATIO']
24+
NUM_STAGES = params['NUM_STAGES']
25+
26+
def test_ray_pipeline():
27+
ray_pipeline_begin = perf_counter()
28+
29+
@ray.remote(num_cpus=1)
30+
def consumer(obj_ref):
31+
#args = ray.get(obj_ref)
32+
return True
33+
34+
@ray.remote(num_cpus=1)
35+
def producer():
36+
time.sleep(0.1)
37+
for i in range(1000000):
38+
pass
39+
return np.zeros(OBJECT_SIZE // 8)
40+
41+
num_fill_object_store = OBJECT_STORE_SIZE//OBJECT_SIZE
42+
produced_objs = [producer.remote() for _ in range(WORKING_SET_RATIO*num_fill_object_store)]
43+
refs = [[] for _ in range(NUM_STAGES)]
44+
45+
for obj in produced_objs:
46+
refs[0].append(consumer.remote(obj))
47+
'''
48+
for stage in range(1, NUM_STAGES):
49+
for r in refs[stage-1]:
50+
refs[stage].append(consumer.remote(r))
51+
'''
52+
del produced_objs
53+
#ray.get(refs[-1])
54+
for r in refs[0]:
55+
print(r)
56+
ray.get(r)
57+
'''
58+
for ref in refs:
59+
for r in ref:
60+
ray.get(r)
61+
'''
62+
ray_pipeline_end = perf_counter()
63+
64+
return ray_pipeline_end - ray_pipeline_begin
65+
66+
ray.init(object_store_memory=OBJECT_STORE_SIZE)
67+
print(test_ray_pipeline())

src/ray/common/ray_config_def.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024)
374374

375375
/// If set to less than 1.0, Ray will start spilling objects when existing objects
376376
/// take more than this percentage of the available memory.
377-
RAY_CONFIG(float, object_spilling_threshold, 0.8)
377+
RAY_CONFIG(float, object_spilling_threshold, 1.0)
378378

379379
/// Maximum number of objects that can be fused into a single file.
380380
RAY_CONFIG(int64_t, max_fused_object_count, 2000)

src/ray/common/task/task_priority.cc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,16 @@ void Priority::extend(int64_t size) const {
1111
}
1212
}
1313

14+
void Priority::SetFromParentPriority(Priority &parent, int s){
15+
//param s id the last score to add
16+
if(parent.score.size() == 1 && parent.score[0] == INT_MAX){
17+
score[0] = s;
18+
}else{
19+
score = parent.score;
20+
score.push_back(s);
21+
}
22+
}
23+
1424
bool Priority::operator<(const Priority &rhs) const {
1525
rhs.extend(score.size());
1626
extend(rhs.score.size());
@@ -25,6 +35,19 @@ bool Priority::operator<=(const Priority &rhs) const {
2535
return score <= rhs.score;
2636
}
2737

38+
bool Priority::operator>(const Priority &rhs) const {
39+
rhs.extend(score.size());
40+
extend(rhs.score.size());
41+
42+
return score > rhs.score;
43+
}
44+
45+
bool Priority::operator>=(const Priority &rhs) const {
46+
rhs.extend(score.size());
47+
extend(rhs.score.size());
48+
49+
return score >= rhs.score;
50+
}
2851

2952
std::ostream &operator<<(std::ostream &os, const Priority &p) {
3053
os << "[ ";

src/ray/common/task/task_priority.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ struct Priority {
2323

2424
void extend(int64_t size) const;
2525

26+
void SetFromParentPriority(Priority &parent, int);
27+
2628
bool operator==(const Priority &rhs) const {
2729
rhs.extend(score.size());
2830
extend(rhs.score.size());
@@ -37,6 +39,10 @@ struct Priority {
3739

3840
bool operator<=(const Priority &rhs) const;
3941

42+
bool operator>(const Priority &rhs) const;
43+
44+
bool operator>=(const Priority &rhs) const;
45+
4046
int GetScore(int64_t depth) const {
4147
extend(depth + 1);
4248
return score[depth];

src/ray/common/task/task_util.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class TaskSpecBuilder {
105105
const std::unordered_map<std::string, double> &required_placement_resources,
106106
const BundleID &bundle_id, bool placement_group_capture_child_tasks,
107107
const std::string &debugger_breakpoint,
108+
const Priority &priority,
108109
const std::string &serialized_runtime_env = "{}",
109110
const std::vector<std::string> &runtime_env_uris = {},
110111
const std::string &concurrency_group_name = "") {
@@ -129,6 +130,12 @@ class TaskSpecBuilder {
129130
placement_group_capture_child_tasks);
130131
message_->set_debugger_breakpoint(debugger_breakpoint);
131132
message_->mutable_runtime_env()->set_serialized_runtime_env(serialized_runtime_env);
133+
134+
auto pri = message_->mutable_priority();
135+
for (auto &s : priority.score) {
136+
pri->Add(s);
137+
}
138+
132139
for (const std::string &uri : runtime_env_uris) {
133140
message_->mutable_runtime_env()->add_uris(uri);
134141
}

src/ray/core_worker/core_worker.cc

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,19 @@ void BuildCommonTaskSpec(
4040
const std::unordered_map<std::string, double> &required_resources,
4141
const std::unordered_map<std::string, double> &required_placement_resources,
4242
const BundleID &bundle_id, bool placement_group_capture_child_tasks,
43-
const std::string debugger_breakpoint, const std::string &serialized_runtime_env,
43+
const std::string debugger_breakpoint,
44+
const Priority &priority,
45+
const std::string &serialized_runtime_env,
4446
const std::vector<std::string> &runtime_env_uris,
4547
const std::string &concurrency_group_name = "") {
4648
// Build common task spec.
4749
builder.SetCommonTaskSpec(
4850
task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id,
4951
current_task_id, task_index, caller_id, address, num_returns, required_resources,
5052
required_placement_resources, bundle_id, placement_group_capture_child_tasks,
51-
debugger_breakpoint, serialized_runtime_env, runtime_env_uris,
53+
debugger_breakpoint,
54+
priority,
55+
serialized_runtime_env, runtime_env_uris,
5256
concurrency_group_name);
5357
// Set task arguments.
5458
for (const auto &arg : args) {
@@ -681,7 +685,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
681685
std::move(lease_policy), memory_store_, task_manager_, local_raylet_id,
682686
RayConfig::instance().worker_lease_timeout_milliseconds(), actor_creator_,
683687
/*get_task_priority=*/[](const TaskSpecification &spec) {
684-
return Priority();
688+
return spec.GetPriority();
685689
},
686690
RayConfig::instance().max_tasks_in_flight_per_worker(),
687691
boost::asio::steady_timer(io_service_));
@@ -1671,9 +1675,11 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
16711675
rpc_address_, function, args, task_options.num_returns,
16721676
constrained_resources, required_resources, placement_options,
16731677
placement_group_capture_child_tasks, debugger_breakpoint,
1678+
Priority(),
16741679
task_options.serialized_runtime_env, task_options.runtime_env_uris);
16751680
builder.SetNormalTaskSpec(max_retries, retry_exceptions);
16761681
TaskSpecification task_spec = builder.Build();
1682+
//priority = task_manager_->GenerateTaskPriority(task_spec);
16771683
RAY_LOG(DEBUG) << "Submit task " << task_spec.DebugString();
16781684
std::vector<rpc::ObjectReference> returned_refs;
16791685
if (options_.is_local_mode) {
@@ -1683,6 +1689,7 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
16831689
CurrentCallSite(), max_retries);
16841690
io_service_.post(
16851691
[this, task_spec]() {
1692+
//(Jae) This is the reason why tasks are not placed with priority
16861693
RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec));
16871694
},
16881695
"CoreWorker.SubmitTask");
@@ -1727,6 +1734,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
17271734
new_placement_resources, actor_creation_options.placement_options,
17281735
actor_creation_options.placement_group_capture_child_tasks,
17291736
"", /* debugger_breakpoint */
1737+
Priority(),
17301738
actor_creation_options.serialized_runtime_env,
17311739
actor_creation_options.runtime_env_uris);
17321740

@@ -1911,6 +1919,7 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitActorTask(
19111919
required_resources, std::make_pair(PlacementGroupID::Nil(), -1),
19121920
true, /* placement_group_capture_child_tasks */
19131921
"", /* debugger_breakpoint */
1922+
Priority(),
19141923
"{}", /* serialized_runtime_env */
19151924
{}, /* runtime_env_uris */
19161925
task_options.concurrency_group_name);
@@ -1923,6 +1932,7 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitActorTask(
19231932

19241933
// Submit task.
19251934
TaskSpecification task_spec = builder.Build();
1935+
std::vector<ObjectID> task_deps;
19261936
std::vector<rpc::ObjectReference> returned_refs;
19271937
if (options_.is_local_mode) {
19281938
returned_refs = ExecuteTaskLocalMode(task_spec, actor_id);

src/ray/core_worker/reference_count.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,20 @@ void ReferenceCounter::RemoveLocalReference(const ObjectID &object_id,
262262
}
263263
}
264264

265+
Priority& ReferenceCounter::GetObjectPriority(const ObjectID &object_id){
266+
absl::MutexLock lock(&mutex_);
267+
auto it = object_id_priority_.find(object_id);
268+
RAY_CHECK(it != object_id_priority_.end()) << "Object priority not found " << object_id;
269+
return it->second;
270+
}
271+
272+
void ReferenceCounter::UpdateObjectPriority(
273+
const ObjectID &object_id,
274+
const Priority &priority){
275+
absl::MutexLock lock(&mutex_);
276+
object_id_priority_[object_id] = priority;
277+
}
278+
265279
void ReferenceCounter::UpdateSubmittedTaskReferences(
266280
const std::vector<ObjectID> &argument_ids_to_add,
267281
const std::vector<ObjectID> &argument_ids_to_remove, std::vector<ObjectID> *deleted) {

0 commit comments

Comments
 (0)