Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions src/ray/object_manager/object_buffer_pool.cc
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#include "ray/object_manager/object_buffer_pool.h"

#include "arrow/util/logging.h"
#include "ray/status.h"
#include "ray/util/logging.h"

namespace ray {

ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name,
uint64_t chunk_size)
: default_chunk_size_(chunk_size) {
store_socket_name_ = store_socket_name;
ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str()));
RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str()));
}

ObjectBufferPool::~ObjectBufferPool() {
Expand All @@ -23,7 +24,7 @@ ObjectBufferPool::~ObjectBufferPool() {
}
RAY_CHECK(get_buffer_state_.empty());
RAY_CHECK(create_buffer_state_.empty());
ARROW_CHECK_OK(store_client_.Disconnect());
RAY_ARROW_CHECK_OK(store_client_.Disconnect());
}

uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) {
Expand All @@ -44,7 +45,7 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Ge
if (get_buffer_state_.count(object_id) == 0) {
plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.to_plasma_id();
ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer));
RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer));
if (object_buffer.data == nullptr) {
RAY_LOG(ERROR) << "Failed to get object";
return std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status>(
Expand Down Expand Up @@ -73,14 +74,14 @@ void ObjectBufferPool::ReleaseGetChunk(const ObjectID &object_id, uint64_t chunk
buffer_state.references--;
RAY_LOG(DEBUG) << "ReleaseBuffer " << object_id << " " << buffer_state.references;
if (buffer_state.references == 0) {
ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id()));
get_buffer_state_.erase(object_id);
}
}

void ObjectBufferPool::AbortGet(const ObjectID &object_id) {
std::lock_guard<std::mutex> lock(pool_mutex_);
ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id()));
get_buffer_state_.erase(object_id);
}

Expand Down Expand Up @@ -156,16 +157,16 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk
<< create_buffer_state_[object_id].num_seals_remaining;
if (create_buffer_state_[object_id].num_seals_remaining == 0) {
const plasma::ObjectID plasma_id = object_id.to_plasma_id();
ARROW_CHECK_OK(store_client_.Seal(plasma_id));
ARROW_CHECK_OK(store_client_.Release(plasma_id));
RAY_ARROW_CHECK_OK(store_client_.Seal(plasma_id));
RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id));
create_buffer_state_.erase(object_id);
}
}

void ObjectBufferPool::AbortCreate(const ObjectID &object_id) {
const plasma::ObjectID plasma_id = object_id.to_plasma_id();
ARROW_CHECK_OK(store_client_.Release(plasma_id));
ARROW_CHECK_OK(store_client_.Abort(plasma_id));
RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id));
RAY_ARROW_CHECK_OK(store_client_.Abort(plasma_id));
create_buffer_state_.erase(object_id);
}

Expand Down Expand Up @@ -194,7 +195,7 @@ void ObjectBufferPool::FreeObjects(const std::vector<ObjectID> &object_ids) {
plasma_ids.push_back(id.to_plasma_id());
}
std::lock_guard<std::mutex> lock(pool_mutex_);
ARROW_CHECK_OK(store_client_.Delete(plasma_ids));
RAY_ARROW_CHECK_OK(store_client_.Delete(plasma_ids));
}

std::string ObjectBufferPool::DebugString() const {
Expand Down
8 changes: 4 additions & 4 deletions src/ray/object_manager/object_store_notification_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <boost/bind.hpp>
#include <boost/function.hpp>

#include "arrow/util/logging.h"
#include "ray/status.h"

#include "ray/common/common_protocol.h"
#include "ray/object_manager/object_store_notification_manager.h"
Expand All @@ -20,17 +20,17 @@ ObjectStoreNotificationManager::ObjectStoreNotificationManager(
num_adds_processed_(0),
num_removes_processed_(0),
socket_(io_service) {
ARROW_CHECK_OK(store_client_.Connect(store_socket_name.c_str()));
RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket_name.c_str()));

ARROW_CHECK_OK(store_client_.Subscribe(&c_socket_));
RAY_ARROW_CHECK_OK(store_client_.Subscribe(&c_socket_));
boost::system::error_code ec;
socket_.assign(boost::asio::local::stream_protocol(), c_socket_, ec);
assert(!ec.value());
NotificationWait();
}

ObjectStoreNotificationManager::~ObjectStoreNotificationManager() {
ARROW_CHECK_OK(store_client_.Disconnect());
RAY_ARROW_CHECK_OK(store_client_.Disconnect());
}

void ObjectStoreNotificationManager::NotificationWait() {
Expand Down
16 changes: 8 additions & 8 deletions src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#include "gtest/gtest.h"

#include "arrow/util/logging.h"
#include "ray/status.h"

#include "ray/object_manager/object_manager.h"

Expand Down Expand Up @@ -157,8 +157,8 @@ class TestObjectManagerBase : public ::testing::Test {
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));

// connect to stores.
ARROW_CHECK_OK(client1.Connect(store_id_1));
ARROW_CHECK_OK(client2.Connect(store_id_2));
RAY_ARROW_CHECK_OK(client1.Connect(store_id_1));
RAY_ARROW_CHECK_OK(client2.Connect(store_id_2));
}

void TearDown() {
Expand All @@ -179,9 +179,9 @@ class TestObjectManagerBase : public ::testing::Test {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
return object_id;
}

Expand Down Expand Up @@ -291,14 +291,14 @@ class StressTestObjectManager : public TestObjectManagerBase {
plasma::ObjectBuffer GetObject(plasma::PlasmaClient &client, ObjectID &object_id) {
plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.to_plasma_id();
ARROW_CHECK_OK(client.Get(&plasma_id, 1, 0, &object_buffer));
RAY_ARROW_CHECK_OK(client.Get(&plasma_id, 1, 0, &object_buffer));
return object_buffer;
}

static unsigned char *GetDigest(plasma::PlasmaClient &client, ObjectID &object_id) {
const int64_t size = sizeof(uint64_t);
static unsigned char digest_1[size];
ARROW_CHECK_OK(client.Hash(object_id.to_plasma_id(), &digest_1[0]));
RAY_ARROW_CHECK_OK(client.Hash(object_id.to_plasma_id(), &digest_1[0]));
return digest_1;
}

Expand Down
12 changes: 6 additions & 6 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include "gtest/gtest.h"

#include "arrow/util/logging.h"
#include "ray/status.h"

#include "ray/object_manager/object_manager.h"

Expand Down Expand Up @@ -142,8 +142,8 @@ class TestObjectManagerBase : public ::testing::Test {
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));

// connect to stores.
ARROW_CHECK_OK(client1.Connect(store_id_1));
ARROW_CHECK_OK(client2.Connect(store_id_2));
RAY_ARROW_CHECK_OK(client1.Connect(store_id_1));
RAY_ARROW_CHECK_OK(client2.Connect(store_id_2));
}

void TearDown() {
Expand All @@ -168,9 +168,9 @@ class TestObjectManagerBase : public ::testing::Test {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
return object_id;
}

Expand Down
8 changes: 4 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <fstream>

#include "arrow/util/logging.h"
#include "ray/status.h"

#include "ray/common/common_protocol.h"
#include "ray/id.h"
Expand Down Expand Up @@ -90,7 +90,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
RAY_CHECK_OK(object_manager_.SubscribeObjDeleted(
[this](const ObjectID &object_id) { HandleObjectMissing(object_id); }));

ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str()));
RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str()));
}

ray::Status NodeManager::RegisterGcs() {
Expand Down Expand Up @@ -1148,8 +1148,8 @@ void NodeManager::TreatTaskAsFailed(const Task &task) {
if (!status.IsPlasmaObjectExists()) {
// TODO(rkn): We probably don't want this checks. E.g., if the object
// store is full, we don't want to kill the raylet.
ARROW_CHECK_OK(status);
ARROW_CHECK_OK(store_client_.Seal(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(status);
RAY_ARROW_CHECK_OK(store_client_.Seal(object_id.to_plasma_id()));
}
}
// A task failing is equivalent to assigning and finishing the task, so clean
Expand Down
12 changes: 6 additions & 6 deletions src/ray/raylet/object_manager_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include "gtest/gtest.h"

#include "arrow/util/logging.h"
#include "ray/status.h"

#include "ray/raylet/raylet.h"

Expand Down Expand Up @@ -76,8 +76,8 @@ class TestObjectManagerBase : public ::testing::Test {
GetNodeManagerConfig("raylet_2", store_sock_2), om_config_2, gcs_client_2));

// connect to stores.
ARROW_CHECK_OK(client1.Connect(store_sock_1));
ARROW_CHECK_OK(client2.Connect(store_sock_2));
RAY_ARROW_CHECK_OK(client1.Connect(store_sock_1));
RAY_ARROW_CHECK_OK(client2.Connect(store_sock_2));
}

void TearDown() {
Expand All @@ -104,9 +104,9 @@ class TestObjectManagerBase : public ::testing::Test {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
return object_id;
}

Expand Down
10 changes: 10 additions & 0 deletions src/ray/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@
// logged message.
#define RAY_CHECK_OK(s) RAY_CHECK_OK_PREPEND(s, "Bad status")

// This macro is used to replace the "ARROW_CHECK_OK_PREPEND" macro.
#define RAY_ARROW_CHECK_OK_PREPEND(to_call, msg) \
do { \
::arrow::Status _s = (to_call); \
RAY_CHECK(_s.ok()) << (msg) << ": " << _s.ToString(); \
} while (0)

// This macro is used to replace the "ARROW_CHECK_OK" macro.
#define RAY_ARROW_CHECK_OK(s) RAY_ARROW_CHECK_OK_PREPEND(s, "Bad status")

namespace ray {

enum class StatusCode : char {
Expand Down