Skip to content
Merged
25 changes: 25 additions & 0 deletions python/ray/tests/test_failure_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,31 @@ def get_raylet_pid(self):
wait_for_pid_to_exit(worker_pid)


def test_plasma_store_operation_after_raylet_dies(ray_start_cluster):
"""
Test that the operation on the plasma store after the raylet dies will not fail the
task with an application level error (RayTaskError) but a system level error
(RayletDiedError).
"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
cluster.wait_for_nodes()

ray.init(address=cluster.address)

@ray.remote
def get_after_raylet_dies():
raylet_pid = int(os.environ["RAY_RAYLET_PID"])
os.kill(raylet_pid, SIGKILL)
wait_for_pid_to_exit(raylet_pid)
ray.put([0] * 100000)

try:
ray.get(get_after_raylet_dies.remote(), timeout=10)
except Exception as e:
assert isinstance(e, ray.exceptions.LocalRayletDiedError)


@pytest.mark.parametrize(
"ray_start_cluster_head",
[
Expand Down
4 changes: 2 additions & 2 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
///
/// \param buffer The buffer.
/// \return Status.
Status WriteBuffer(const std::vector<boost::asio::const_buffer> &buffer);
virtual Status WriteBuffer(const std::vector<boost::asio::const_buffer> &buffer);

/// Write a buffer to this connection asynchronously.
///
Expand All @@ -113,7 +113,7 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
///
/// \param buffer The buffer.
/// \return Status.
Status ReadBuffer(const std::vector<boost::asio::mutable_buffer> &buffer);
virtual Status ReadBuffer(const std::vector<boost::asio::mutable_buffer> &buffer);

/// Read a buffer from this connection asynchronously.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
bool warmup,
std::function<std::string()> get_current_call_site)
: raylet_client_(raylet_client),
store_client_(std::make_shared<plasma::PlasmaClient>()),
// We can turn on exit_on_connection_failure on for the core worker plasma
// client to early exit core worker after the raylet's death because on the
// raylet side, we never proactively close the plasma store connection even
// during shutdown. So any error from the raylet side should be a sign of raylet
// death.
store_client_(
std::make_shared<plasma::PlasmaClient>(/*exit_on_connection_failure*/ true)),
reference_counter_(reference_counter),
check_signals_(std::move(check_signals)) {
if (get_current_call_site != nullptr) {
Expand Down
14 changes: 12 additions & 2 deletions src/ray/object_manager/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ struct ObjectInUseEntry {
class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Impl> {
public:
Impl();
explicit Impl(bool exit_on_connection_failure);
~Impl();

// PlasmaClient method implementations
Expand Down Expand Up @@ -235,11 +236,17 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
std::unordered_set<ObjectID> deletion_cache_;
/// A mutex which protects this class.
std::recursive_mutex client_mutex_;
/// Whether the current process should exit when read or write to the connection fails.
/// It should only be turned on when the plasma client is in a core worker.
bool exit_on_connection_failure_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of modifying low-level PlasmaClient, why can't we reuse existing check_signals_ mechanism in CoreWorkerPlasmaStoreProvider, which is the abstraction layer between the Core Worker and the plasma store?

The get and wait paths in CoreWorkerPlasmaStoreProvider proactively check for interrupts via check_signals_. We can do the same for other Plasma operations.

CoreWorker::ExitIfParentRayletDies() already checks periodically if the raylet process is alive. Then let the CoreWorker take a call what to do based on error propagated from CoreWorkerPlasmaStoreProvider.

};

PlasmaBuffer::~PlasmaBuffer() { RAY_UNUSED(client_->Release(object_id_)); }

PlasmaClient::Impl::Impl() : store_capacity_(0) {}
PlasmaClient::Impl::Impl() : store_capacity_(0), exit_on_connection_failure_(false) {}

PlasmaClient::Impl::Impl(bool exit_on_connection_failure)
: store_capacity_(0), exit_on_connection_failure_(exit_on_connection_failure) {}

PlasmaClient::Impl::~Impl() {}

Expand Down Expand Up @@ -868,7 +875,7 @@ Status PlasmaClient::Impl::Connect(const std::string &store_socket_name,
/// The local stream socket that connects to store.
ray::local_stream_socket socket(main_service_);
RAY_RETURN_NOT_OK(ray::ConnectSocketRetry(socket, store_socket_name));
store_conn_.reset(new StoreConn(std::move(socket)));
store_conn_.reset(new StoreConn(std::move(socket), exit_on_connection_failure_));
// Send a ConnectRequest to the store to get its memory capacity.
RAY_RETURN_NOT_OK(SendConnectRequest(store_conn_));
std::vector<uint8_t> buffer;
Expand Down Expand Up @@ -912,6 +919,9 @@ std::string PlasmaClient::Impl::DebugString() {

PlasmaClient::PlasmaClient() : impl_(std::make_shared<PlasmaClient::Impl>()) {}

PlasmaClient::PlasmaClient(bool exit_on_connection_failure)
: impl_(std::make_shared<PlasmaClient::Impl>(exit_on_connection_failure)) {}

Status PlasmaClient::Connect(const std::string &store_socket_name,
const std::string &manager_socket_name,
int num_retries) {
Expand Down
2 changes: 2 additions & 0 deletions src/ray/object_manager/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ class PlasmaClient : public PlasmaClientInterface {
public:
PlasmaClient();

explicit PlasmaClient(bool exit_on_connection_failure);

Status Connect(const std::string &store_socket_name,
const std::string &manager_socket_name = "",
int num_retries = -1) override;
Expand Down
30 changes: 29 additions & 1 deletion src/ray/object_manager/plasma/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ Status Client::SendFd(MEMFD_TYPE fd) {
}

StoreConn::StoreConn(ray::local_stream_socket &&socket)
: ray::ServerConnection(std::move(socket)) {}
: ray::ServerConnection(std::move(socket)), exit_on_connection_failure_(false) {}

StoreConn::StoreConn(ray::local_stream_socket &&socket, bool exit_on_connection_failure)
: ray::ServerConnection(std::move(socket)),
exit_on_connection_failure_(exit_on_connection_failure) {}

Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) {
#ifdef _WIN32
Expand All @@ -192,4 +196,28 @@ Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) {
return Status::OK();
}

ray::Status StoreConn::WriteBuffer(const std::vector<boost::asio::const_buffer> &buffer) {
auto status = ray::ServerConnection::WriteBuffer(buffer);
ExitIfErrorStatus(status);
return status;
}

ray::Status StoreConn::ReadBuffer(
const std::vector<boost::asio::mutable_buffer> &buffer) {
auto status = ray::ServerConnection::ReadBuffer(buffer);
ExitIfErrorStatus(status);
return status;
}

void StoreConn::ExitIfErrorStatus(const ray::Status &status) {
if (!status.ok() && exit_on_connection_failure_) {
RAY_LOG(WARNING) << "The connection to the plasma store is failed. Terminate the "
<< "process. Status: " << status;
ray::QuickExit();
RAY_LOG(FATAL)
<< "Accessing unreachable code. This line should never be reached "
<< "after quick process exit due to plasma store connection failure. Please "
"create a github issue at https://github.com/ray-project/ray.";
}
}
} // namespace plasma
18 changes: 18 additions & 0 deletions src/ray/object_manager/plasma/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,28 @@ class StoreConn : public ray::ServerConnection {
public:
explicit StoreConn(ray::local_stream_socket &&socket);

explicit StoreConn(ray::local_stream_socket &&socket, bool exit_on_connection_failure);

/// Receive a file descriptor for the store.
///
/// \return A file descriptor.
ray::Status RecvFd(MEMFD_TYPE_NON_UNIQUE *fd);

ray::Status WriteBuffer(const std::vector<boost::asio::const_buffer> &buffer) override;

ray::Status ReadBuffer(const std::vector<boost::asio::mutable_buffer> &buffer) override;

private:
// Whether the current process should exit when WriteBuffer or ReadBuffer fails.
// Currently it is only turned on when the plasma client is in a core worker.
// TODO(myan): The better way is to handle the failure outside of the plasma client
// and inside the core worker's logic and propogate the correct exception to the user.
bool exit_on_connection_failure_ = false;

// Shutdown the current process if the passed in status is not OK and the client is
// configured to exit on failure.
// @param status: The status to check.
void ExitIfErrorStatus(const ray::Status &status);
};

std::ostream &operator<<(std::ostream &os, const std::shared_ptr<StoreConn> &store_conn);
Expand Down