Skip to content
7 changes: 5 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3831,8 +3831,11 @@ cdef class CoreWorker:
c_vector[CObjectID] free_ids = ObjectRefsToVector(object_refs)

with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Delete(
free_ids, local_only))
status = CCoreWorkerProcess.GetCoreWorker().Delete(free_ids, local_only)
if status.IsIOError():
check_status(CRayStatus.UnexpectedSystemExit(status.ToString()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of inspecting it here, can we do if is IOError then UnexpectedSystemExit, else as-is in CoreWorker::Delete ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts: if there's another code path calling Delete and get broken pipe, it should all be considered as UnexpectedSystemExit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced offline. It is better to add the logic in the CoreWorker::Delete because it can cover the case for other languages as well. The CoreWorker::Delete function is intended to be open to all languages to call and is not called in other ray internal code paths.

else:
check_status(status)

def get_local_ongoing_lineage_reconstruction_tasks(self):
cdef:
Expand Down
6 changes: 6 additions & 0 deletions src/ray/raylet_client/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ Status raylet::RayletClient::Disconnect(
auto status = conn_->WriteMessage(MessageType::DisconnectClient, &fbb);
// Don't be too strict for disconnection errors.
// Just create logs and prevent it from crash.
// TODO (myan): In the current implementation, if raylet is already terminated in the
// "WriteMessage" function above, the worker process will exit early in the function
// and will not reach here. However, the code path here is shared between graceful
// shutdown and force termination. We need to make sure the above early exit
// shouldn't happen during the graceful shutdown scenario and there shouldn't be any
// leak if early exit is triggered
if (!status.ok()) {
RAY_LOG(WARNING)
<< status.ToString()
Expand Down