Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer));
ObjectID id;
PlasmaObject object;
RETURN_NOT_OK(ReadCreateReply(buffer.data(), &id, &object));
RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &object));
// If the CreateReply included an error, then the store will not send a file
// descriptor.
int fd = recv_fd(store_conn_);
Expand Down Expand Up @@ -204,7 +204,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
std::vector<PlasmaObject> object_data(num_objects);
PlasmaObject* object;
RETURN_NOT_OK(ReadGetReply(
buffer.data(), received_object_ids.data(), object_data.data(), num_objects));
buffer.data(), buffer.size(), received_object_ids.data(), object_data.data(), num_objects));

for (int i = 0; i < num_objects; ++i) {
DCHECK(received_object_ids[i] == object_ids[i]);
Expand Down Expand Up @@ -328,7 +328,7 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, &buffer));
ObjectID object_id2;
RETURN_NOT_OK(ReadContainsReply(buffer.data(), &object_id2, has_object));
RETURN_NOT_OK(ReadContainsReply(buffer.data(), buffer.size(), &object_id2, has_object));
}
return Status::OK();
}
Expand Down Expand Up @@ -436,7 +436,7 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
std::vector<uint8_t> buffer;
int64_t type;
RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
return ReadEvictReply(buffer.data(), num_bytes_evicted);
return ReadEvictReply(buffer.data(), buffer.size(), num_bytes_evicted);
}

Status PlasmaClient::Subscribe(int* fd) {
Expand Down Expand Up @@ -473,7 +473,7 @@ Status PlasmaClient::Connect(const std::string& store_socket_name,
RETURN_NOT_OK(SendConnectRequest(store_conn_));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaConnectReply, &buffer));
RETURN_NOT_OK(ReadConnectReply(buffer.data(), &store_capacity_));
RETURN_NOT_OK(ReadConnectReply(buffer.data(), buffer.size(), &store_capacity_));
return Status::OK();
}

Expand Down Expand Up @@ -511,7 +511,7 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaStatusReply, &buffer));
ObjectID id;
RETURN_NOT_OK(ReadStatusReply(buffer.data(), &id, object_status, 1));
RETURN_NOT_OK(ReadStatusReply(buffer.data(), buffer.size(), &id, object_status, 1));
ARROW_CHECK(object_id == id);
return Status::OK();
}
Expand All @@ -532,7 +532,7 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req
num_ready_objects, timeout_ms));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer));
RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, &num_ready_objects));
RETURN_NOT_OK(ReadWaitReply(buffer.data(), buffer.size(), object_requests, &num_ready_objects));

*num_objects_ready = 0;
for (int i = 0; i < num_object_requests; ++i) {
Expand Down
71 changes: 47 additions & 24 deletions cpp/src/plasma/protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ Status SendCreateRequest(
}

Status ReadCreateRequest(
uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) {
uint8_t* data, size_t size, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*data_size = message->data_size();
*metadata_size = message->metadata_size();
*object_id = ObjectID::from_binary(message->object_id()->str());
Expand All @@ -81,9 +82,10 @@ Status SendCreateReply(
return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
}

Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object) {
Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, PlasmaObject* object) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
object->handle.store_fd = message->plasma_object()->segment_index();
object->handle.mmap_size = message->plasma_object()->mmap_size();
Expand All @@ -104,9 +106,10 @@ Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message);
}

Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest) {
Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, unsigned char* digest) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaSealRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
ARROW_CHECK(message->digest()->size() == kDigestSize);
memcpy(digest, message->digest()->data(), kDigestSize);
Expand All @@ -120,9 +123,10 @@ Status SendSealReply(int sock, ObjectID object_id, int error) {
return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message);
}

Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaSealReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return plasma_error_status(message->error());
}
Expand All @@ -131,13 +135,14 @@ Status ReadSealReply(uint8_t* data, ObjectID* object_id) {

Status SendReleaseRequest(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message = CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()));
auto message = CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary()));
return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message);
}

Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) {
Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaReleaseRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
Expand All @@ -149,9 +154,10 @@ Status SendReleaseReply(int sock, ObjectID object_id, int error) {
return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message);
}

Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) {
Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return plasma_error_status(message->error());
}
Expand All @@ -164,9 +170,10 @@ Status SendDeleteRequest(int sock, ObjectID object_id) {
return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message);
}

Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) {
Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
Expand All @@ -178,9 +185,10 @@ Status SendDeleteReply(int sock, ObjectID object_id, int error) {
return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message);
}

Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) {
Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return plasma_error_status(message->error());
}
Expand All @@ -194,9 +202,10 @@ Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objec
return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message);
}

Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects) {
Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[], int64_t num_objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaStatusRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
}
Expand All @@ -212,16 +221,18 @@ Status SendStatusReply(
return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message);
}

int64_t ReadStatusReply_num_objects(uint8_t* data) {
int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
return message->object_ids()->size();
}

Status ReadStatusReply(
uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects) {
uint8_t* data, size_t size, ObjectID object_ids[], int object_status[], int64_t num_objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
}
Expand All @@ -239,9 +250,10 @@ Status SendContainsRequest(int sock, ObjectID object_id) {
return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message);
}

Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) {
Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaContainsRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
Expand All @@ -253,9 +265,10 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message);
}

Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) {
Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, bool* has_object) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaContainsReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
*has_object = message->has_object();
return Status::OK();
Expand All @@ -279,9 +292,10 @@ Status SendConnectReply(int sock, int64_t memory_capacity) {
return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message);
}

Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) {
Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaConnectReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*memory_capacity = message->memory_capacity();
return Status::OK();
}
Expand All @@ -294,9 +308,10 @@ Status SendEvictRequest(int sock, int64_t num_bytes) {
return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message);
}

Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) {
Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaEvictRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*num_bytes = message->num_bytes();
return Status::OK();
}
Expand All @@ -307,9 +322,10 @@ Status SendEvictReply(int sock, int64_t num_bytes) {
return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message);
}

Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) {
Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaEvictReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
num_bytes = message->num_bytes();
return Status::OK();
}
Expand All @@ -325,9 +341,10 @@ Status SendGetRequest(
}

Status ReadGetRequest(
uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms) {
uint8_t* data, size_t size, std::vector<ObjectID>& object_ids, int64_t* timeout_ms) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
auto object_id = message->object_ids()->Get(i)->str();
object_ids.push_back(ObjectID::from_binary(object_id));
Expand All @@ -353,10 +370,11 @@ Status SendGetReply(int sock, ObjectID object_ids[],
return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
}

Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], PlasmaObject plasma_objects[],
int64_t num_objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
}
Expand All @@ -381,9 +399,10 @@ Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_object
return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message);
}

Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids) {
Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaFetchRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str()));
}
Expand All @@ -408,10 +427,11 @@ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_re
return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message);
}

Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
int64_t* timeout_ms, int* num_ready_objects) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
*num_ready_objects = message->num_ready_objects();
*timeout_ms = message->timeout();

Expand Down Expand Up @@ -442,10 +462,11 @@ Status SendWaitReply(
}

Status ReadWaitReply(
uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects) {
uint8_t* data, size_t size, ObjectRequest object_requests[], int* num_ready_objects) {
DCHECK(data);

auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*num_ready_objects = message->num_ready_objects();
for (int i = 0; i < *num_ready_objects; i++) {
object_requests[i].object_id =
Expand Down Expand Up @@ -473,9 +494,10 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po
return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message);
}

Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port) {
Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address, int* port) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaDataRequest>(data);
DCHECK(verify_flatbuffer(message, data, size));
DCHECK(message->object_id()->size() == sizeof(ObjectID));
*object_id = ObjectID::from_binary(message->object_id()->str());
*address = strdup(message->address()->c_str());
Expand All @@ -492,9 +514,10 @@ Status SendDataReply(
}

Status ReadDataReply(
uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size) {
uint8_t* data, size_t size, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size) {
DCHECK(data);
auto message = flatbuffers::GetRoot<PlasmaDataReply>(data);
DCHECK(verify_flatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
*object_size = (int64_t)message->object_size();
*metadata_size = (int64_t)message->metadata_size();
Expand Down
Loading