Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ cdef class CoreWorker:
cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata,
size_t data_size, ObjectRef object_ref,
c_vector[CObjectID] contained_ids,
CObjectID *c_object_id, shared_ptr[CBuffer] *data)
CObjectID *c_object_id, shared_ptr[CBuffer] *data,
owner_address=*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this =* mean?

Copy link
Contributor Author

@clarkzinzow clarkzinzow Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's how you define optional arguments in Cython .pxd header files: https://cython.readthedocs.io/en/stable/src/userguide/language_basics.html#optional-arguments

cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns)
Expand Down
27 changes: 20 additions & 7 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -628,15 +628,17 @@ cdef void gc_collect() nogil:


cdef c_vector[c_string] spill_objects_handler(
const c_vector[CObjectID]& object_ids_to_spill) nogil:
const c_vector[CObjectID]& object_ids_to_spill,
const c_vector[c_string]& owner_addresses) nogil:
cdef c_vector[c_string] return_urls
with gil:
object_refs = VectorToObjectRefs(object_ids_to_spill)
try:
with ray.worker._changeproctitle(
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER,
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE):
urls = external_storage.spill_objects(object_refs)
urls = external_storage.spill_objects(
object_refs, owner_addresses)
for url in urls:
return_urls.push_back(url)
except Exception:
Expand Down Expand Up @@ -930,19 +932,28 @@ cdef class CoreWorker:
cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata,
size_t data_size, ObjectRef object_ref,
c_vector[CObjectID] contained_ids,
CObjectID *c_object_id, shared_ptr[CBuffer] *data):
CObjectID *c_object_id, shared_ptr[CBuffer] *data,
owner_address=None):
cdef:
CAddress c_owner_address

if object_ref is None:
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().CreateOwned(
metadata, data_size, contained_ids,
c_object_id, data))
else:
c_object_id[0] = object_ref.native()
if owner_address is None:
c_owner_address = CCoreWorkerProcess.GetCoreWorker(
).GetRpcAddress()
else:
c_owner_address = CAddress()
c_owner_address.ParseFromString(owner_address)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().CreateExisting(
metadata, data_size, c_object_id[0],
CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(),
data))
c_owner_address, data))

# If data is nullptr, that means the ObjectRef already existed,
# which we ignore.
Expand All @@ -951,7 +962,8 @@ cdef class CoreWorker:
return data.get() == NULL

def put_file_like_object(
self, metadata, data_size, file_like, ObjectRef object_ref):
self, metadata, data_size, file_like, ObjectRef object_ref,
owner_address):
"""Directly create a new Plasma Store object from a file like
object. This avoids extra memory copy.

Expand All @@ -961,6 +973,7 @@ cdef class CoreWorker:
file_like: A python file object that provides the `readinto`
interface.
object_ref: The new ObjectRef.
owner_address: Owner address for this object ref.
"""
cdef:
CObjectID c_object_id
Expand All @@ -975,7 +988,7 @@ cdef class CoreWorker:
object_already_exists = self._create_put_buffer(
metadata_buf, data_size, object_ref,
ObjectRefsToVector([]),
&c_object_id, &data_buf)
&c_object_id, &data_buf, owner_address)
if object_already_exists:
logger.debug("Object already exists in 'put_file_like_object'.")
return
Expand Down
57 changes: 38 additions & 19 deletions python/ray/external_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,21 @@ def _get_objects_from_store(self, object_refs):
ray_object_pairs = worker.core_worker.get_if_local(object_refs)
return ray_object_pairs

def _put_object_to_store(self, metadata, data_size, file_like, object_ref):
def _put_object_to_store(self, metadata, data_size, file_like, object_ref,
owner_address):
worker = ray.worker.global_worker
worker.core_worker.put_file_like_object(metadata, data_size, file_like,
object_ref)
object_ref, owner_address)

def _write_multiple_objects(self, f: IO, object_refs: List[ObjectRef],
owner_addresses: List[str],
url: str) -> List[str]:
"""Fuse all given objects into a given file handle.

Args:
f(IO): File handle to fusion all given object refs.
object_refs(list): Object references to fusion to a single file.
owner_addresses(list): Owner addresses for the provided objects.
url(str): url where the object ref is stored
in the external storage.

Expand All @@ -112,13 +115,17 @@ def _write_multiple_objects(self, f: IO, object_refs: List[ObjectRef],
keys = []
offset = 0
ray_object_pairs = self._get_objects_from_store(object_refs)
for ref, (buf, metadata) in zip(object_refs, ray_object_pairs):
for ref, (buf, metadata), owner_address in zip(
object_refs, ray_object_pairs, owner_addresses):
address_len = len(owner_address)
metadata_len = len(metadata)
buf_len = len(buf)
# 16 bytes to store metadata and buffer length.
data_size_in_bytes = metadata_len + buf_len + 16
# 24 bytes to store owner address, metadata, and buffer lengths.
data_size_in_bytes = address_len + metadata_len + buf_len + 24
f.write(address_len.to_bytes(8, byteorder="little"))
f.write(metadata_len.to_bytes(8, byteorder="little"))
f.write(buf_len.to_bytes(8, byteorder="little"))
f.write(owner_address)
f.write(metadata)
f.write(memoryview(buf))
url_with_offset = create_url_with_offset(
Expand All @@ -127,7 +134,8 @@ def _write_multiple_objects(self, f: IO, object_refs: List[ObjectRef],
offset += data_size_in_bytes
return keys

def _size_check(self, metadata_len, buffer_len, obtained_data_size):
def _size_check(self, address_len, metadata_len, buffer_len,
obtained_data_size):
"""Check whether or not the obtained_data_size is as expected.

Args:
Expand All @@ -138,17 +146,18 @@ def _size_check(self, metadata_len, buffer_len, obtained_data_size):

Raises:
ValueError if obtained_data_size is different from
metadata_len + buffer_len + 16(first 8 bytes to store length).
address_len + metadata_len + buffer_len +
24 (first 8 bytes to store length).
"""
data_size_in_bytes = metadata_len + buffer_len + 16
data_size_in_bytes = address_len + metadata_len + buffer_len + 24
if data_size_in_bytes != obtained_data_size:
raise ValueError(
f"Obtained data has a size of {data_size_in_bytes}, "
"although it is supposed to have the "
f"size of {obtained_data_size}.")

@abc.abstractmethod
def spill_objects(self, object_refs) -> List[str]:
def spill_objects(self, object_refs, owner_addresses) -> List[str]:
"""Spill objects to the external storage. Objects are specified
by their object refs.

Expand Down Expand Up @@ -191,7 +200,7 @@ def destroy_external_storage(self):
class NullStorage(ExternalStorage):
"""The class that represents an uninitialized external storage."""

def spill_objects(self, object_refs) -> List[str]:
def spill_objects(self, object_refs, owner_addresses) -> List[str]:
raise NotImplementedError("External storage is not initialized")

def restore_spilled_objects(self, object_refs, url_with_offset_list):
Expand Down Expand Up @@ -220,15 +229,16 @@ def __init__(self, directory_path):
raise ValueError("The given directory path to store objects, "
f"{self.directory_path}, could not be created.")

def spill_objects(self, object_refs) -> List[str]:
def spill_objects(self, object_refs, owner_addresses) -> List[str]:
if len(object_refs) == 0:
return []
# Always use the first object ref as a key when fusioning objects.
first_ref = object_refs[0]
filename = f"{first_ref.hex()}-multi-{len(object_refs)}"
url = f"{os.path.join(self.directory_path, filename)}"
with open(url, "wb") as f:
return self._write_multiple_objects(f, object_refs, url)
return self._write_multiple_objects(f, object_refs,
owner_addresses, url)

def restore_spilled_objects(self, object_refs: List[ObjectRef],
url_with_offset_list: List[str]):
Expand All @@ -243,13 +253,17 @@ def restore_spilled_objects(self, object_refs: List[ObjectRef],
# Read a part of the file and recover the object.
with open(base_url, "rb") as f:
f.seek(offset)
address_len = int.from_bytes(f.read(8), byteorder="little")
metadata_len = int.from_bytes(f.read(8), byteorder="little")
buf_len = int.from_bytes(f.read(8), byteorder="little")
self._size_check(metadata_len, buf_len, parsed_result.size)
self._size_check(address_len, metadata_len, buf_len,
parsed_result.size)
total += buf_len
owner_address = f.read(address_len)
metadata = f.read(metadata_len)
# read remaining data to our buffer
self._put_object_to_store(metadata, buf_len, f, object_ref)
self._put_object_to_store(metadata, buf_len, f, object_ref,
owner_address)
return total

def delete_spilled_objects(self, urls: List[str]):
Expand Down Expand Up @@ -320,7 +334,7 @@ def __init__(self,
self.transport_params = {"defer_seek": True}
self.transport_params.update(self.override_transport_params)

def spill_objects(self, object_refs) -> List[str]:
def spill_objects(self, object_refs, owner_addresses) -> List[str]:
if len(object_refs) == 0:
return []
from smart_open import open
Expand All @@ -331,7 +345,8 @@ def spill_objects(self, object_refs) -> List[str]:
with open(
url, "wb",
transport_params=self.transport_params) as file_like:
return self._write_multiple_objects(file_like, object_refs, url)
return self._write_multiple_objects(file_like, object_refs,
owner_addresses, url)

def restore_spilled_objects(self, object_refs: List[ObjectRef],
url_with_offset_list: List[str]):
Expand All @@ -352,13 +367,16 @@ def restore_spilled_objects(self, object_refs: List[ObjectRef],
# smart open seek reads the file from offset-end_of_the_file
# when the seek is called.
f.seek(offset)
address_len = int.from_bytes(f.read(8), byteorder="little")
metadata_len = int.from_bytes(f.read(8), byteorder="little")
buf_len = int.from_bytes(f.read(8), byteorder="little")
self._size_check(metadata_len, buf_len, parsed_result.size)
owner_address = f.read(address_len)
total += buf_len
metadata = f.read(metadata_len)
# read remaining data to our buffer
self._put_object_to_store(metadata, buf_len, f, object_ref)
self._put_object_to_store(metadata, buf_len, f, object_ref,
owner_address)
return total

def delete_spilled_objects(self, urls: List[str]):
Expand Down Expand Up @@ -397,16 +415,17 @@ def reset_external_storage():
_external_storage = NullStorage()


def spill_objects(object_refs):
def spill_objects(object_refs, owner_addresses):
"""Spill objects to the external storage. Objects are specified
by their object refs.

Args:
object_refs: The list of the refs of the objects to be spilled.
owner_addresses: The owner addresses of the provided object refs.
Returns:
A list of keys corresponding to the input object refs.
"""
return _external_storage.spill_objects(object_refs)
return _external_storage.spill_objects(object_refs, owner_addresses)


def restore_spilled_objects(object_refs: List[ObjectRef],
Expand Down
4 changes: 3 additions & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
(void(const CWorkerID &) nogil) on_worker_shutdown
(CRayStatus() nogil) check_signals
(void() nogil) gc_collect
(c_vector[c_string](const c_vector[CObjectID] &) nogil) spill_objects
(c_vector[c_string](
const c_vector[CObjectID] &,
const c_vector[c_string] &) nogil) spill_objects
(int64_t(
const c_vector[CObjectID] &,
const c_vector[c_string] &) nogil) restore_spilled_objects
Expand Down
44 changes: 39 additions & 5 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,8 @@ void CoreWorker::SpillOwnedObject(const ObjectID &object_id,
RAY_LOG(ERROR) << "Failed to spill object " << object_id
<< ", raylet unreachable or object could not be spilled.";
}
// TODO(Clark): Provide spilled URL and spilled node ID to callback so it can
// added them to the reference.
callback();
});
}
Expand All @@ -1273,6 +1275,7 @@ Status CoreWorker::SpillObjects(const std::vector<ObjectID> &object_ids) {
auto ready_promise = std::make_shared<std::promise<void>>(std::promise<void>());
Status final_status;

// TODO(Clark): Add spilled URL and spilled node ID to reference in this callback.
auto callback = [mutex, num_remaining, ready_promise]() {
absl::MutexLock lock(mutex.get());
(*num_remaining)--;
Expand Down Expand Up @@ -1312,7 +1315,10 @@ Status CoreWorker::SpillObjects(const std::vector<ObjectID> &object_ids) {
ready_promise->get_future().wait();

for (const auto &object_id : object_ids) {
reference_counter_->HandleObjectSpilled(object_id);
// TODO(Clark): Move this to the callback (unless we really wanted to batch it) and
// also include the spilled URL, spilled node ID, and updated object size.
reference_counter_->HandleObjectSpilled(object_id, "", NodeID::Nil(), -1,
/*release*/ true);
}
return final_status;
}
Expand Down Expand Up @@ -2221,15 +2227,19 @@ void CoreWorker::HandleGetObjectLocationsOwner(
auto object_id = ObjectID::FromBinary(request.object_id());
const auto &callback = [object_id, reply, send_reply_callback](
const absl::flat_hash_set<NodeID> &locations,
int64_t object_size, int64_t current_version) {
int64_t object_size, const std::string &spilled_url,
const NodeID &spilled_node_id, int64_t current_version) {
RAY_LOG(DEBUG) << "Replying to HandleGetObjectLocationsOwner for " << object_id
<< " with location update version " << current_version << ", "
<< locations.size() << " locations, and " << object_size
<< " object size.";
<< locations.size() << " locations, " << spilled_url
<< " spilled url, " << spilled_node_id << " spilled node ID, and "
<< object_size << " object size.";
for (const auto &node_id : locations) {
reply->add_node_ids(node_id.Binary());
}
reply->set_object_size(object_size);
reply->set_spilled_url(spilled_url);
reply->set_spilled_node_id(spilled_node_id.Binary());
reply->set_current_version(current_version);
send_reply_callback(Status::OK(), nullptr, nullptr);
};
Expand Down Expand Up @@ -2422,7 +2432,13 @@ void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request,
for (const auto &id_binary : request.object_ids_to_spill()) {
object_ids_to_spill.push_back(ObjectID::FromBinary(id_binary));
}
std::vector<std::string> object_urls = options_.spill_objects(object_ids_to_spill);
std::vector<std::string> owner_addresses;
owner_addresses.reserve(request.owner_addresses_size());
for (const auto &owner_address : request.owner_addresses()) {
owner_addresses.push_back(owner_address.SerializeAsString());
}
std::vector<std::string> object_urls =
options_.spill_objects(object_ids_to_spill, owner_addresses);
for (size_t i = 0; i < object_urls.size(); i++) {
reply->add_spilled_objects_url(std::move(object_urls[i]));
}
Expand All @@ -2433,6 +2449,24 @@ void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request,
}
}

void CoreWorker::HandleAddSpilledUrl(const rpc::AddSpilledUrlRequest &request,
rpc::AddSpilledUrlReply *reply,
rpc::SendReplyCallback send_reply_callback) {
const ObjectID &object_id = ObjectID::FromBinary(request.object_id());
const std::string &spilled_url = request.spilled_url();
const NodeID &node_id = NodeID::FromBinary(request.spilled_node_id());
RAY_LOG(DEBUG) << "Received AddSpilledUrl request for object " << object_id
<< ", which has been spilled to " << spilled_url << " on node "
<< node_id;
auto reference_exists = reference_counter_->HandleObjectSpilled(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

object_id, spilled_url, node_id, request.size(), /*release*/ false);
Status status =
reference_exists
? Status::OK()
: Status::ObjectNotFound("Object " + object_id.Hex() + " not found");
send_reply_callback(status, nullptr, nullptr);
}

void CoreWorker::HandleRestoreSpilledObjects(
const rpc::RestoreSpilledObjectsRequest &request,
rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) {
Expand Down
9 changes: 8 additions & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ struct CoreWorkerOptions {
/// be held up in garbage objects.
std::function<void()> gc_collect;
/// Application-language callback to spill objects to external storage.
std::function<std::vector<std::string>(const std::vector<ObjectID> &)> spill_objects;
std::function<std::vector<std::string>(const std::vector<ObjectID> &,
const std::vector<std::string> &)>
spill_objects;
/// Application-language callback to restore objects from external storage.
std::function<int64_t(const std::vector<ObjectID> &, const std::vector<std::string> &)>
restore_spilled_objects;
Expand Down Expand Up @@ -908,6 +910,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
rpc::SpillObjectsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

// Add spilled URL to owned reference.
void HandleAddSpilledUrl(const rpc::AddSpilledUrlRequest &request,
rpc::AddSpilledUrlReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

// Restore objects from external storage.
void HandleRestoreSpilledObjects(const rpc::RestoreSpilledObjectsRequest &request,
rpc::RestoreSpilledObjectsReply *reply,
Expand Down
Loading