Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ matrix:
- . ./ci/travis/ci.sh build
script:
# Run all C++ unit tests with ASAN enabled. ASAN adds too much overhead to run Python tests.
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only -- //:all
# NOTE: core_worker_test is out-of-date and should already covered by
# Python tests.
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only -- //:all -core_worker_test

- os: osx
osx_image: xcode7
Expand Down Expand Up @@ -435,11 +437,10 @@ matrix:
script:
- . ./ci/travis/ci.sh test_cpp
script:
# raylet integration tests (core_worker_tests included in bazel tests below)
- ./ci/suppress_output bash src/ray/test/run_object_manager_tests.sh

# cc bazel tests (w/o RLlib)
- ./ci/suppress_output bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only -- //:all -rllib/...
# NOTE: core_worker_test is out-of-date and should already covered by Python
# tests.
- ./ci/suppress_output bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only -- //:all -rllib/... -core_worker_test

# ray serve tests
- if [ $RAY_CI_SERVE_AFFECTED == "1" ]; then ./ci/keep_alive bazel test --config=ci $(./scripts/bazel_export_options) --test_tag_filters=-jenkins_only python/ray/serve/...; fi
Expand Down
24 changes: 0 additions & 24 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1365,30 +1365,6 @@ cc_library(
],
)

cc_binary(
name = "object_manager_test",
testonly = 1,
srcs = ["src/ray/object_manager/test/object_manager_test.cc"],
copts = COPTS,
deps = [
":object_manager",
"//src/ray/protobuf:common_cc_proto",
"@com_google_googletest//:gtest_main",
],
)

cc_binary(
name = "object_manager_stress_test",
testonly = 1,
srcs = ["src/ray/object_manager/test/object_manager_stress_test.cc"],
copts = COPTS,
deps = [
":object_manager",
"//src/ray/protobuf:common_cc_proto",
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "platform_shims",
srcs = [] + select({
Expand Down
83 changes: 83 additions & 0 deletions python/ray/tests/test_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,89 @@ def driver():
ray.get(driver.remote())


@pytest.mark.skip(
reason="This hangs due to a deadlock between a worker getting its "
"arguments and the node pulling arguments for the next task queued.")
@pytest.mark.timeout(30)
def test_pull_bundles_admission_control(shutdown_only):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just use ray_start_cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was too lazy to figure out how to parametrize it properly :D Also, I was running into trouble where the non-head node would connect first, so the rest of the test wouldn't run properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh lol. I think you can just do like

cluster = ray_start_cluster
cluster.add_node()
cluster.wait_for_nodes()
ray.init(address=cluster.address)
cluster.add_node...

cluster = Cluster()
object_size = int(6e6)
num_objects = 10
num_tasks = 10
# Head node can fit all of the objects at once.
cluster.add_node(
num_cpus=0,
object_store_memory=2 * num_tasks * num_objects * object_size)
cluster.wait_for_nodes()
ray.init(address=cluster.address)

# Worker node can only fit 1 task at a time.
cluster.add_node(
num_cpus=1, object_store_memory=1.5 * num_objects * object_size)
cluster.wait_for_nodes()

@ray.remote
def foo(*args):
return

args = []
for _ in range(num_tasks):
task_args = [
ray.put(np.zeros(object_size, dtype=np.uint8))
for _ in range(num_objects)
]
args.append(task_args)

tasks = [foo.remote(*task_args) for task_args in args]
ray.get(tasks)


@pytest.mark.skip(
reason="This hangs due to a deadlock between a worker getting its "
"arguments and the node pulling arguments for the next task queued.")
@pytest.mark.timeout(30)
def test_pull_bundles_admission_control_dynamic(shutdown_only):
# This test is the same as test_pull_bundles_admission_control, except that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice test!

# the object store's capacity starts off higher and is later consumed
# dynamically by concurrent workers.
cluster = Cluster()
object_size = int(6e6)
num_objects = 10
num_tasks = 10
# Head node can fit all of the objects at once.
cluster.add_node(
num_cpus=0,
object_store_memory=2 * num_tasks * num_objects * object_size)
cluster.wait_for_nodes()
ray.init(address=cluster.address)

# Worker node can fit 2 tasks at a time.
cluster.add_node(
num_cpus=1, object_store_memory=2.5 * num_objects * object_size)
cluster.wait_for_nodes()

@ray.remote
def foo(*args):
return

@ray.remote
def allocate(*args):
return np.zeros(object_size, dtype=np.uint8)

args = []
for _ in range(num_tasks):
task_args = [
ray.put(np.zeros(object_size, dtype=np.uint8))
for _ in range(num_objects)
]
args.append(task_args)

tasks = [foo.remote(*task_args) for task_args in args]
allocated = [allocate.remote() for _ in range(num_objects)]
ray.get(tasks)
del allocated


if __name__ == "__main__":
import pytest
import sys
Expand Down
61 changes: 61 additions & 0 deletions python/ray/tests/test_object_spilling.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,5 +661,66 @@ def test_release_during_plasma_fetch(tmp_path, shutdown_only):
do_test_release_resource(tmp_path, expect_released=True)


@pytest.mark.skip(
reason="This hangs due to a deadlock between a worker getting its "
"arguments and the node pulling arguments for the next task queued.")
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
@pytest.mark.timeout(30)
def test_spill_objects_on_object_transfer(object_spilling_config,
ray_start_cluster):
# This test checks that objects get spilled to make room for transferred
# objects.
cluster = ray_start_cluster
object_size = int(1e7)
num_objects = 10
num_tasks = 10
# Head node can fit all of the objects at once.
cluster.add_node(
num_cpus=0,
object_store_memory=2 * num_tasks * num_objects * object_size,
_system_config={
"max_io_workers": 1,
"automatic_object_spilling_enabled": True,
"object_store_full_delay_ms": 100,
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0
})
cluster.wait_for_nodes()
ray.init(address=cluster.address)

# Worker node can fit 1 tasks at a time.
cluster.add_node(
num_cpus=1, object_store_memory=1.5 * num_objects * object_size)
cluster.wait_for_nodes()

@ray.remote
def foo(*args):
return

@ray.remote
def allocate(*args):
return np.zeros(object_size, dtype=np.uint8)

# Allocate some objects that must be spilled to make room for foo's
# arguments.
allocated = [allocate.remote() for _ in range(num_objects)]
ray.get(allocated)
print("done allocating")

args = []
for _ in range(num_tasks):
task_args = [
ray.put(np.zeros(object_size, dtype=np.uint8))
for _ in range(num_objects)
]
args.append(task_args)

# Check that tasks scheduled to the worker node have enough room after
# spilling.
tasks = [foo.remote(*task_args) for task_args in args]
ray.get(tasks)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
3 changes: 3 additions & 0 deletions python/ray/tests/test_reconstruction.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ def probe():
raise e.as_instanceof_cause()


@pytest.mark.skip(reason="This hangs due to a deadlock in admission control.")
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled):
config = {
Expand Down Expand Up @@ -436,6 +437,7 @@ def dependent_task(x):
raise e.as_instanceof_cause()


@pytest.mark.skip(reason="This hangs due to a deadlock in admission control.")
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled):
config = {
Expand Down Expand Up @@ -487,6 +489,7 @@ def dependent_task(x):
raise e.as_instanceof_cause()


@pytest.mark.skip(reason="This hangs due to a deadlock in admission control.")
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_reconstruction_stress(ray_start_cluster):
config = {
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2213,6 +2213,7 @@ void CoreWorker::HandleGetObjectLocationsOwner(
} else {
status = Status::ObjectNotFound("Object " + object_id.Hex() + " not found");
}
reply->set_object_size(reference_counter_->GetObjectSize(object_id));
send_reply_callback(status, nullptr, nullptr);
}

Expand Down
9 changes: 9 additions & 0 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,15 @@ absl::optional<absl::flat_hash_set<NodeID>> ReferenceCounter::GetObjectLocations
return it->second.locations;
}

size_t ReferenceCounter::GetObjectSize(const ObjectID &object_id) const {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
return 0;
}
return it->second.object_size;
}

void ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id) {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
Expand Down
6 changes: 6 additions & 0 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,12 @@ class ReferenceCounter : public ReferenceCounterInterface,
absl::optional<absl::flat_hash_set<NodeID>> GetObjectLocations(
const ObjectID &object_id) LOCKS_EXCLUDED(mutex_);

/// Get an object's size. This will return 0 if the object is out of scope.
///
/// \param[in] object_id The object whose size to get.
/// \return Object size, or 0 if the object is out of scope.
size_t GetObjectSize(const ObjectID &object_id) const;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a comment? (and explain when 0 is returned?)


/// Handle an object has been spilled to external storage.
///
/// This notifies the primary raylet that the object is safe to release and
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ class ObjectInfoAccessor {
/// \param callback Callback that will be called after object has been added to GCS.
/// \return Status
virtual Status AsyncAddLocation(const ObjectID &object_id, const NodeID &node_id,
const StatusCallback &callback) = 0;
size_t object_size, const StatusCallback &callback) = 0;

/// Add spilled location of object to GCS asynchronously.
///
Expand Down
4 changes: 4 additions & 0 deletions src/ray/gcs/gcs_client/service_based_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1128,13 +1128,15 @@ Status ServiceBasedObjectInfoAccessor::AsyncGetAll(

Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_id,
const NodeID &node_id,
size_t object_size,
const StatusCallback &callback) {
RAY_LOG(DEBUG) << "Adding object location, object id = " << object_id
<< ", node id = " << node_id
<< ", job id = " << object_id.TaskId().JobId();
rpc::AddObjectLocationRequest request;
request.set_object_id(object_id.Binary());
request.set_node_id(node_id.Binary());
request.set_size(object_size);

auto operation = [this, request, object_id, node_id,
callback](const SequencerDoneCallback &done_callback) {
Expand Down Expand Up @@ -1229,11 +1231,13 @@ Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations(
rpc::ObjectLocationChange update;
update.set_is_add(true);
update.set_node_id(loc.manager());
update.set_size(result->size());
notification.push_back(update);
}
if (!result->spilled_url().empty()) {
rpc::ObjectLocationChange update;
update.set_spilled_url(result->spilled_url());
update.set_size(result->size());
notification.push_back(update);
}
subscribe(object_id, notification);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/service_based_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ class ServiceBasedObjectInfoAccessor : public ObjectInfoAccessor {
Status AsyncGetAll(const MultiItemCallback<rpc::ObjectLocationInfo> &callback) override;

Status AsyncAddLocation(const ObjectID &object_id, const NodeID &node_id,
const StatusCallback &callback) override;
size_t object_size, const StatusCallback &callback) override;

Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url,
const StatusCallback &callback) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ TEST_F(GlobalStateAccessorTest, TestObjectTable) {
NodeID node_id = NodeID::FromRandom();
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Objects().AsyncAddLocation(
object_id, node_id,
object_id, node_id, 0,
[&promise](Status status) { promise.set_value(status.ok()); }));
WaitReady(promise.get_future(), timeout_ms_);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
bool AddLocation(const ObjectID &object_id, const NodeID &node_id) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Objects().AsyncAddLocation(
object_id, node_id,
object_id, node_id, 0,
[&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
}
Expand Down
7 changes: 6 additions & 1 deletion src/ray/gcs/gcs_server/gcs_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void GcsObjectManager::HandleGetAllObjectLocations(
object_table_data.set_manager(node_id.Binary());
object_location_info.add_locations()->CopyFrom(object_table_data);
}
object_location_info.set_size(item.second.object_size);
reply->add_object_location_info_list()->CopyFrom(object_location_info);
}
RAY_LOG(DEBUG) << "Finished getting all object locations.";
Expand Down Expand Up @@ -78,7 +79,8 @@ void GcsObjectManager::HandleAddObjectLocation(
RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id;
}

auto on_done = [this, object_id, node_id, spilled_url, reply,
size_t size = request.size();
auto on_done = [this, object_id, node_id, spilled_url, size, reply,
send_reply_callback](const Status &status) {
if (status.ok()) {
rpc::ObjectLocationChange notification;
Expand All @@ -89,6 +91,7 @@ void GcsObjectManager::HandleAddObjectLocation(
if (!spilled_url.empty()) {
notification.set_spilled_url(spilled_url);
}
notification.set_size(size);
RAY_CHECK_OK(gcs_pub_sub_->Publish(OBJECT_CHANNEL, object_id.Hex(),
notification.SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished adding object location, job id = "
Expand All @@ -107,6 +110,7 @@ void GcsObjectManager::HandleAddObjectLocation(
};

absl::MutexLock lock(&mutex_);
object_to_locations_[object_id].object_size = size;
const auto object_data = GenObjectLocationInfo(object_id);
Status status = gcs_table_storage_->ObjectTable().Put(object_id, object_data, on_done);
if (!status.ok()) {
Expand Down Expand Up @@ -287,6 +291,7 @@ const ObjectLocationInfo GcsObjectManager::GenObjectLocationInfo(
object_data.add_locations()->set_manager(node_id.Binary());
}
object_data.set_spilled_url(it->second.spilled_url);
object_data.set_size(it->second.object_size);
}
return object_data;
}
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class GcsObjectManager : public rpc::ObjectInfoHandler {
struct LocationSet {
absl::flat_hash_set<NodeID> locations;
std::string spilled_url = "";
size_t object_size = 0;
};

/// Add a location of objects.
Expand Down
Loading