Skip to content
4 changes: 2 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3833,8 +3833,8 @@ cdef class CoreWorker:
c_vector[CObjectID] free_ids = ObjectRefsToVector(object_refs)

with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Delete(
free_ids, local_only))
check_status(CCoreWorkerProcess.GetCoreWorker().
Delete(free_ids, local_only))

def get_local_ongoing_lineage_reconstruction_tasks(self):
cdef:
Expand Down
12 changes: 9 additions & 3 deletions python/ray/tests/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,14 +575,20 @@ def test_disable_driver_logs_breakpoint():
@ray.remote
def f():
while True:
time.sleep(1)
start_time = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test failed on one of the commits. I was not able to reproduce locally on my Mac but I think it is because time.sleep() could be inaccurate on a VM. So the update here is to use time.time() to make the sleeping duration more accurate.

while time.time() - start_time < 1:
time.sleep(0.1)
print("hello there")
sys.stdout.flush()

def kill():
time.sleep(5)
start_time = time.time()
while time.time() - start_time < 5:
time.sleep(0.1)
sys.stdout.flush()
time.sleep(1)
start_time = time.time()
while time.time() - start_time < 1:
time.sleep(0.1)
os._exit(0)

t = threading.Thread(target=kill)
Expand Down
7 changes: 6 additions & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,12 @@ Status CoreWorker::Delete(const std::vector<ObjectID> &object_ids, bool local_on
}
}
// Also try to delete all objects locally.
return DeleteImpl(object_ids, local_only);
Status status = DeleteImpl(object_ids, local_only);
if (status.IsIOError()) {
return Status::UnexpectedSystemExit(status.ToString());
} else {
return status;
}
}

Status CoreWorker::GetLocationFromOwner(
Expand Down
5 changes: 5 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// This calls DeleteImpl() locally for objects we own, and DeleteImpl() remotely
/// for objects we do not own.
///
/// If IOError is returned from DeleteImpl() when deleting objects locally, we will
/// return an UnexpectedSystemExit status instead. This is to make sure the tasks
/// that calls this function in application code can properly retry when hitting the
/// IOError.
///
/// \param[in] object_ids IDs of the objects to delete.
/// \param[in] local_only Whether only delete the objects in local node, or all nodes in
/// the cluster.
Expand Down
6 changes: 6 additions & 0 deletions src/ray/raylet_client/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ Status raylet::RayletClient::Disconnect(
auto status = conn_->WriteMessage(MessageType::DisconnectClient, &fbb);
// Don't be too strict for disconnection errors.
// Just create logs and prevent it from crash.
// TODO (myan): In the current implementation, if raylet is already terminated in the
// "WriteMessage" function above, the worker process will exit early in the function
// and will not reach here. However, the code path here is shared between graceful
// shutdown and force termination. We need to make sure the above early exit
// shouldn't happen during the graceful shutdown scenario and there shouldn't be any
// leak if early exit is triggered
if (!status.ok()) {
RAY_LOG(WARNING)
<< status.ToString()
Expand Down
Loading