Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ cdef class CoreWorker:
cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata,
size_t data_size, ObjectRef object_ref,
c_vector[CObjectID] contained_ids,
CObjectID *c_object_id, shared_ptr[CBuffer] *data)
CObjectID *c_object_id, shared_ptr[CBuffer] *data,
owner_address=*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this =* mean?

Copy link
Contributor Author

@clarkzinzow clarkzinzow Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's how you define optional arguments in Cython .pxd header files: https://cython.readthedocs.io/en/stable/src/userguide/language_basics.html#optional-arguments

cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns)
Expand Down
27 changes: 20 additions & 7 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -628,15 +628,17 @@ cdef void gc_collect() nogil:


cdef c_vector[c_string] spill_objects_handler(
const c_vector[CObjectID]& object_ids_to_spill) nogil:
const c_vector[CObjectID]& object_ids_to_spill,
const c_vector[c_string]& owner_addresses) nogil:
cdef c_vector[c_string] return_urls
with gil:
object_refs = VectorToObjectRefs(object_ids_to_spill)
try:
with ray.worker._changeproctitle(
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER,
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE):
urls = external_storage.spill_objects(object_refs)
urls = external_storage.spill_objects(
object_refs, owner_addresses)
for url in urls:
return_urls.push_back(url)
except Exception:
Expand Down Expand Up @@ -930,19 +932,28 @@ cdef class CoreWorker:
cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata,
size_t data_size, ObjectRef object_ref,
c_vector[CObjectID] contained_ids,
CObjectID *c_object_id, shared_ptr[CBuffer] *data):
CObjectID *c_object_id, shared_ptr[CBuffer] *data,
owner_address=None):
cdef:
CAddress c_owner_address

if object_ref is None:
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().CreateOwned(
metadata, data_size, contained_ids,
c_object_id, data))
else:
c_object_id[0] = object_ref.native()
if owner_address is None:
c_owner_address = CCoreWorkerProcess.GetCoreWorker(
).GetRpcAddress()
else:
c_owner_address = CAddress()
c_owner_address.ParseFromString(owner_address)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().CreateExisting(
metadata, data_size, c_object_id[0],
CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(),
data))
c_owner_address, data))

# If data is nullptr, that means the ObjectRef already existed,
# which we ignore.
Expand All @@ -951,7 +962,8 @@ cdef class CoreWorker:
return data.get() == NULL

def put_file_like_object(
self, metadata, data_size, file_like, ObjectRef object_ref):
self, metadata, data_size, file_like, ObjectRef object_ref,
owner_address):
"""Directly create a new Plasma Store object from a file like
object. This avoids extra memory copy.

Expand All @@ -961,6 +973,7 @@ cdef class CoreWorker:
file_like: A python file object that provides the `readinto`
interface.
object_ref: The new ObjectRef.
owner_address: Owner address for this object ref.
"""
cdef:
CObjectID c_object_id
Expand All @@ -975,7 +988,7 @@ cdef class CoreWorker:
object_already_exists = self._create_put_buffer(
metadata_buf, data_size, object_ref,
ObjectRefsToVector([]),
&c_object_id, &data_buf)
&c_object_id, &data_buf, owner_address)
if object_already_exists:
logger.debug("Object already exists in 'put_file_like_object'.")
return
Expand Down
61 changes: 42 additions & 19 deletions python/ray/external_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class ExternalStorage(metaclass=abc.ABCMeta):
the external storage is invalid.
"""

HEADER_LENGTH = 24

def _get_objects_from_store(self, object_refs):
worker = ray.worker.global_worker
# Since the object should always exist in the plasma store before
Expand All @@ -89,18 +91,21 @@ def _get_objects_from_store(self, object_refs):
ray_object_pairs = worker.core_worker.get_if_local(object_refs)
return ray_object_pairs

def _put_object_to_store(self, metadata, data_size, file_like, object_ref):
def _put_object_to_store(self, metadata, data_size, file_like, object_ref,
owner_address):
worker = ray.worker.global_worker
worker.core_worker.put_file_like_object(metadata, data_size, file_like,
object_ref)
object_ref, owner_address)

def _write_multiple_objects(self, f: IO, object_refs: List[ObjectRef],
owner_addresses: List[str],
url: str) -> List[str]:
"""Fuse all given objects into a given file handle.

Args:
f(IO): File handle to fusion all given object refs.
object_refs(list): Object references to fusion to a single file.
owner_addresses(list): Owner addresses for the provided objects.
url(str): url where the object ref is stored
in the external storage.

Expand All @@ -112,13 +117,18 @@ def _write_multiple_objects(self, f: IO, object_refs: List[ObjectRef],
keys = []
offset = 0
ray_object_pairs = self._get_objects_from_store(object_refs)
for ref, (buf, metadata) in zip(object_refs, ray_object_pairs):
for ref, (buf, metadata), owner_address in zip(
object_refs, ray_object_pairs, owner_addresses):
address_len = len(owner_address)
metadata_len = len(metadata)
buf_len = len(buf)
# 16 bytes to store metadata and buffer length.
data_size_in_bytes = metadata_len + buf_len + 16
# 24 bytes to store owner address, metadata, and buffer lengths.
data_size_in_bytes = (
address_len + metadata_len + buf_len + self.HEADER_LENGTH)
f.write(address_len.to_bytes(8, byteorder="little"))
f.write(metadata_len.to_bytes(8, byteorder="little"))
f.write(buf_len.to_bytes(8, byteorder="little"))
f.write(owner_address)
f.write(metadata)
f.write(memoryview(buf))
url_with_offset = create_url_with_offset(
Expand All @@ -127,7 +137,8 @@ def _write_multiple_objects(self, f: IO, object_refs: List[ObjectRef],
offset += data_size_in_bytes
return keys

def _size_check(self, metadata_len, buffer_len, obtained_data_size):
def _size_check(self, address_len, metadata_len, buffer_len,
obtained_data_size):
"""Check whether or not the obtained_data_size is as expected.

Args:
Expand All @@ -138,17 +149,19 @@ def _size_check(self, metadata_len, buffer_len, obtained_data_size):

Raises:
ValueError if obtained_data_size is different from
metadata_len + buffer_len + 16(first 8 bytes to store length).
address_len + metadata_len + buffer_len +
24 (first 8 bytes to store length).
"""
data_size_in_bytes = metadata_len + buffer_len + 16
data_size_in_bytes = (
address_len + metadata_len + buffer_len + self.HEADER_LENGTH)
if data_size_in_bytes != obtained_data_size:
raise ValueError(
f"Obtained data has a size of {data_size_in_bytes}, "
"although it is supposed to have the "
f"size of {obtained_data_size}.")

@abc.abstractmethod
def spill_objects(self, object_refs) -> List[str]:
def spill_objects(self, object_refs, owner_addresses) -> List[str]:
"""Spill objects to the external storage. Objects are specified
by their object refs.

Expand Down Expand Up @@ -191,7 +204,7 @@ def destroy_external_storage(self):
class NullStorage(ExternalStorage):
"""The class that represents an uninitialized external storage."""

def spill_objects(self, object_refs) -> List[str]:
def spill_objects(self, object_refs, owner_addresses) -> List[str]:
raise NotImplementedError("External storage is not initialized")

def restore_spilled_objects(self, object_refs, url_with_offset_list):
Expand Down Expand Up @@ -220,15 +233,16 @@ def __init__(self, directory_path):
raise ValueError("The given directory path to store objects, "
f"{self.directory_path}, could not be created.")

def spill_objects(self, object_refs) -> List[str]:
def spill_objects(self, object_refs, owner_addresses) -> List[str]:
if len(object_refs) == 0:
return []
# Always use the first object ref as a key when fusioning objects.
first_ref = object_refs[0]
filename = f"{first_ref.hex()}-multi-{len(object_refs)}"
url = f"{os.path.join(self.directory_path, filename)}"
with open(url, "wb") as f:
return self._write_multiple_objects(f, object_refs, url)
return self._write_multiple_objects(f, object_refs,
owner_addresses, url)

def restore_spilled_objects(self, object_refs: List[ObjectRef],
url_with_offset_list: List[str]):
Expand All @@ -243,13 +257,17 @@ def restore_spilled_objects(self, object_refs: List[ObjectRef],
# Read a part of the file and recover the object.
with open(base_url, "rb") as f:
f.seek(offset)
address_len = int.from_bytes(f.read(8), byteorder="little")
metadata_len = int.from_bytes(f.read(8), byteorder="little")
buf_len = int.from_bytes(f.read(8), byteorder="little")
self._size_check(metadata_len, buf_len, parsed_result.size)
self._size_check(address_len, metadata_len, buf_len,
parsed_result.size)
total += buf_len
owner_address = f.read(address_len)
metadata = f.read(metadata_len)
# read remaining data to our buffer
self._put_object_to_store(metadata, buf_len, f, object_ref)
self._put_object_to_store(metadata, buf_len, f, object_ref,
owner_address)
return total

def delete_spilled_objects(self, urls: List[str]):
Expand Down Expand Up @@ -320,7 +338,7 @@ def __init__(self,
self.transport_params = {"defer_seek": True}
self.transport_params.update(self.override_transport_params)

def spill_objects(self, object_refs) -> List[str]:
def spill_objects(self, object_refs, owner_addresses) -> List[str]:
if len(object_refs) == 0:
return []
from smart_open import open
Expand All @@ -331,7 +349,8 @@ def spill_objects(self, object_refs) -> List[str]:
with open(
url, "wb",
transport_params=self.transport_params) as file_like:
return self._write_multiple_objects(file_like, object_refs, url)
return self._write_multiple_objects(file_like, object_refs,
owner_addresses, url)

def restore_spilled_objects(self, object_refs: List[ObjectRef],
url_with_offset_list: List[str]):
Expand All @@ -352,13 +371,16 @@ def restore_spilled_objects(self, object_refs: List[ObjectRef],
# smart open seek reads the file from offset-end_of_the_file
# when the seek is called.
f.seek(offset)
address_len = int.from_bytes(f.read(8), byteorder="little")
metadata_len = int.from_bytes(f.read(8), byteorder="little")
buf_len = int.from_bytes(f.read(8), byteorder="little")
self._size_check(metadata_len, buf_len, parsed_result.size)
owner_address = f.read(address_len)
total += buf_len
metadata = f.read(metadata_len)
# read remaining data to our buffer
self._put_object_to_store(metadata, buf_len, f, object_ref)
self._put_object_to_store(metadata, buf_len, f, object_ref,
owner_address)
return total

def delete_spilled_objects(self, urls: List[str]):
Expand Down Expand Up @@ -397,16 +419,17 @@ def reset_external_storage():
_external_storage = NullStorage()


def spill_objects(object_refs):
def spill_objects(object_refs, owner_addresses):
"""Spill objects to the external storage. Objects are specified
by their object refs.

Args:
object_refs: The list of the refs of the objects to be spilled.
owner_addresses: The owner addresses of the provided object refs.
Returns:
A list of keys corresponding to the input object refs.
"""
return _external_storage.spill_objects(object_refs)
return _external_storage.spill_objects(object_refs, owner_addresses)


def restore_spilled_objects(object_refs: List[ObjectRef],
Expand Down
4 changes: 3 additions & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
(void(const CWorkerID &) nogil) on_worker_shutdown
(CRayStatus() nogil) check_signals
(void() nogil) gc_collect
(c_vector[c_string](const c_vector[CObjectID] &) nogil) spill_objects
(c_vector[c_string](
const c_vector[CObjectID] &,
const c_vector[c_string] &) nogil) spill_objects
(int64_t(
const c_vector[CObjectID] &,
const c_vector[c_string] &) nogil) restore_spilled_objects
Expand Down
3 changes: 2 additions & 1 deletion python/ray/tests/test_object_spilling.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,8 @@ def wait_until_actor_dead():


@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
platform.system() in ["Windows", "Darwin"],
reason="Failing on Windows and MacOS.")
def test_delete_objects_multi_node(multi_node_object_spilling_config,
ray_start_cluster):
# Limit our object store to 75 MiB of memory.
Expand Down
44 changes: 39 additions & 5 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,8 @@ void CoreWorker::SpillOwnedObject(const ObjectID &object_id,
RAY_LOG(ERROR) << "Failed to spill object " << object_id
<< ", raylet unreachable or object could not be spilled.";
}
// TODO(Clark): Provide spilled URL and spilled node ID to callback so it can
// added them to the reference.
callback();
});
}
Expand All @@ -1273,6 +1275,7 @@ Status CoreWorker::SpillObjects(const std::vector<ObjectID> &object_ids) {
auto ready_promise = std::make_shared<std::promise<void>>(std::promise<void>());
Status final_status;

// TODO(Clark): Add spilled URL and spilled node ID to reference in this callback.
auto callback = [mutex, num_remaining, ready_promise]() {
absl::MutexLock lock(mutex.get());
(*num_remaining)--;
Expand Down Expand Up @@ -1312,7 +1315,10 @@ Status CoreWorker::SpillObjects(const std::vector<ObjectID> &object_ids) {
ready_promise->get_future().wait();

for (const auto &object_id : object_ids) {
reference_counter_->HandleObjectSpilled(object_id);
// TODO(Clark): Move this to the callback (unless we really wanted to batch it) and
// also include the spilled URL, spilled node ID, and updated object size.
reference_counter_->HandleObjectSpilled(object_id, "", NodeID::Nil(), -1,
/*release*/ true);
}
return final_status;
}
Expand Down Expand Up @@ -2221,15 +2227,19 @@ void CoreWorker::HandleGetObjectLocationsOwner(
auto object_id = ObjectID::FromBinary(request.object_id());
const auto &callback = [object_id, reply, send_reply_callback](
const absl::flat_hash_set<NodeID> &locations,
int64_t object_size, int64_t current_version) {
int64_t object_size, const std::string &spilled_url,
const NodeID &spilled_node_id, int64_t current_version) {
RAY_LOG(DEBUG) << "Replying to HandleGetObjectLocationsOwner for " << object_id
<< " with location update version " << current_version << ", "
<< locations.size() << " locations, and " << object_size
<< " object size.";
<< locations.size() << " locations, " << spilled_url
<< " spilled url, " << spilled_node_id << " spilled node ID, and "
<< object_size << " object size.";
for (const auto &node_id : locations) {
reply->add_node_ids(node_id.Binary());
}
reply->set_object_size(object_size);
reply->set_spilled_url(spilled_url);
reply->set_spilled_node_id(spilled_node_id.Binary());
reply->set_current_version(current_version);
send_reply_callback(Status::OK(), nullptr, nullptr);
};
Expand Down Expand Up @@ -2422,7 +2432,13 @@ void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request,
for (const auto &id_binary : request.object_ids_to_spill()) {
object_ids_to_spill.push_back(ObjectID::FromBinary(id_binary));
}
std::vector<std::string> object_urls = options_.spill_objects(object_ids_to_spill);
std::vector<std::string> owner_addresses;
owner_addresses.reserve(request.owner_addresses_size());
for (const auto &owner_address : request.owner_addresses()) {
owner_addresses.push_back(owner_address.SerializeAsString());
}
std::vector<std::string> object_urls =
options_.spill_objects(object_ids_to_spill, owner_addresses);
for (size_t i = 0; i < object_urls.size(); i++) {
reply->add_spilled_objects_url(std::move(object_urls[i]));
}
Expand All @@ -2433,6 +2449,24 @@ void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request,
}
}

void CoreWorker::HandleAddSpilledUrl(const rpc::AddSpilledUrlRequest &request,
rpc::AddSpilledUrlReply *reply,
rpc::SendReplyCallback send_reply_callback) {
const ObjectID object_id = ObjectID::FromBinary(request.object_id());
const std::string &spilled_url = request.spilled_url();
const NodeID node_id = NodeID::FromBinary(request.spilled_node_id());
RAY_LOG(DEBUG) << "Received AddSpilledUrl request for object " << object_id
<< ", which has been spilled to " << spilled_url << " on node "
<< node_id;
auto reference_exists = reference_counter_->HandleObjectSpilled(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

object_id, spilled_url, node_id, request.size(), /*release*/ false);
Status status =
reference_exists
? Status::OK()
: Status::ObjectNotFound("Object " + object_id.Hex() + " not found");
send_reply_callback(status, nullptr, nullptr);
}

void CoreWorker::HandleRestoreSpilledObjects(
const rpc::RestoreSpilledObjectsRequest &request,
rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) {
Expand Down
Loading