diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index bbbeb55813c..e14b3d9a46c 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -152,7 +152,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer)); ObjectID id; PlasmaObject object; - RETURN_NOT_OK(ReadCreateReply(buffer.data(), &id, &object)); + RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &object)); // If the CreateReply included an error, then the store will not send a file // descriptor. int fd = recv_fd(store_conn_); @@ -227,7 +227,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, std::vector received_object_ids(num_objects); std::vector object_data(num_objects); PlasmaObject* object; - RETURN_NOT_OK(ReadGetReply(buffer.data(), received_object_ids.data(), + RETURN_NOT_OK(ReadGetReply(buffer.data(), buffer.size(), received_object_ids.data(), object_data.data(), num_objects)); for (int i = 0; i < num_objects; ++i) { @@ -356,7 +356,8 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) { std::vector buffer; RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, &buffer)); ObjectID object_id2; - RETURN_NOT_OK(ReadContainsReply(buffer.data(), &object_id2, has_object)); + RETURN_NOT_OK( + ReadContainsReply(buffer.data(), buffer.size(), &object_id2, has_object)); } return Status::OK(); } @@ -451,7 +452,7 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { std::vector buffer; int64_t type; RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer)); - return ReadEvictReply(buffer.data(), num_bytes_evicted); + return ReadEvictReply(buffer.data(), buffer.size(), num_bytes_evicted); } Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) { @@ -524,7 +525,7 @@ Status PlasmaClient::Connect(const std::string& store_socket_name, RETURN_NOT_OK(SendConnectRequest(store_conn_)); std::vector buffer; RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaConnectReply, &buffer)); - RETURN_NOT_OK(ReadConnectReply(buffer.data(), &store_capacity_)); + RETURN_NOT_OK(ReadConnectReply(buffer.data(), buffer.size(), &store_capacity_)); return Status::OK(); } @@ -564,7 +565,7 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) { std::vector buffer; RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaStatusReply, &buffer)); ObjectID id; - RETURN_NOT_OK(ReadStatusReply(buffer.data(), &id, object_status, 1)); + RETURN_NOT_OK(ReadStatusReply(buffer.data(), buffer.size(), &id, object_status, 1)); ARROW_CHECK(object_id == id); return Status::OK(); } @@ -586,7 +587,8 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req num_ready_objects, timeout_ms)); std::vector buffer; RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer)); - RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, &num_ready_objects)); + RETURN_NOT_OK( + ReadWaitReply(buffer.data(), buffer.size(), object_requests, &num_ready_objects)); *num_objects_ready = 0; for (int i = 0; i < num_object_requests; ++i) { diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc index 19240bb4b8a..77bc8b7aae3 100644 --- a/cpp/src/plasma/protocol.cc +++ b/cpp/src/plasma/protocol.cc @@ -62,10 +62,11 @@ Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size, return PlasmaSend(sock, MessageType_PlasmaCreateRequest, &fbb, message); } -Status ReadCreateRequest(uint8_t* data, ObjectID* object_id, int64_t* data_size, - int64_t* metadata_size) { +Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id, + int64_t* data_size, int64_t* metadata_size) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *data_size = message->data_size(); *metadata_size = message->metadata_size(); *object_id = ObjectID::from_binary(message->object_id()->str()); @@ -83,9 +84,11 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message); } -Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object) { +Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, + PlasmaObject* object) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); object->handle.store_fd = message->plasma_object()->segment_index(); object->handle.mmap_size = message->plasma_object()->mmap_size(); @@ -106,9 +109,11 @@ Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) { return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message); } -Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest) { +Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, + unsigned char* digest) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); ARROW_CHECK(message->digest()->size() == kDigestSize); memcpy(digest, message->digest()->data(), kDigestSize); @@ -122,9 +127,10 @@ Status SendSealReply(int sock, ObjectID object_id, int error) { return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message); } -Status ReadSealReply(uint8_t* data, ObjectID* object_id) { +Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); return plasma_error_status(message->error()); } @@ -133,13 +139,14 @@ Status ReadSealReply(uint8_t* data, ObjectID* object_id) { Status SendReleaseRequest(int sock, ObjectID object_id) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary())); + auto message = CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary())); return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message); } -Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) { +Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); return Status::OK(); } @@ -151,9 +158,10 @@ Status SendReleaseReply(int sock, ObjectID object_id, int error) { return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message); } -Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) { +Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); return plasma_error_status(message->error()); } @@ -166,9 +174,10 @@ Status SendDeleteRequest(int sock, ObjectID object_id) { return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message); } -Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) { +Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); return Status::OK(); } @@ -180,9 +189,10 @@ Status SendDeleteReply(int sock, ObjectID object_id, int error) { return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message); } -Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) { +Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); return plasma_error_status(message->error()); } @@ -196,9 +206,11 @@ Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objec return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message); } -Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects) { +Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[], + int64_t num_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); for (uoffset_t i = 0; i < num_objects; ++i) { object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str()); } @@ -214,16 +226,18 @@ Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[], return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message); } -int64_t ReadStatusReply_num_objects(uint8_t* data) { +int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); return message->object_ids()->size(); } -Status ReadStatusReply(uint8_t* data, ObjectID object_ids[], int object_status[], - int64_t num_objects) { +Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[], + int object_status[], int64_t num_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); for (uoffset_t i = 0; i < num_objects; ++i) { object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str()); } @@ -241,9 +255,10 @@ Status SendContainsRequest(int sock, ObjectID object_id) { return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message); } -Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) { +Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); return Status::OK(); } @@ -255,9 +270,11 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object) { return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message); } -Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) { +Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, + bool* has_object) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); *has_object = message->has_object(); return Status::OK(); @@ -279,9 +296,10 @@ Status SendConnectReply(int sock, int64_t memory_capacity) { return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message); } -Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) { +Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *memory_capacity = message->memory_capacity(); return Status::OK(); } @@ -294,9 +312,10 @@ Status SendEvictRequest(int sock, int64_t num_bytes) { return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message); } -Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) { +Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *num_bytes = message->num_bytes(); return Status::OK(); } @@ -307,9 +326,10 @@ Status SendEvictReply(int sock, int64_t num_bytes) { return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message); } -Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) { +Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); num_bytes = message->num_bytes(); return Status::OK(); } @@ -324,10 +344,11 @@ Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects, return PlasmaSend(sock, MessageType_PlasmaGetRequest, &fbb, message); } -Status ReadGetRequest(uint8_t* data, std::vector& object_ids, +Status ReadGetRequest(uint8_t* data, size_t size, std::vector& object_ids, int64_t* timeout_ms) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { auto object_id = message->object_ids()->Get(i)->str(); object_ids.push_back(ObjectID::from_binary(object_id)); @@ -355,10 +376,11 @@ Status SendGetReply( return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message); } -Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[], - int64_t num_objects) { +Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], + PlasmaObject plasma_objects[], int64_t num_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); for (uoffset_t i = 0; i < num_objects; ++i) { object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str()); } @@ -383,9 +405,10 @@ Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_object return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message); } -Status ReadFetchRequest(uint8_t* data, std::vector& object_ids) { +Status ReadFetchRequest(uint8_t* data, size_t size, std::vector& object_ids) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str())); } @@ -410,10 +433,11 @@ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_re return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message); } -Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests, +Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests, int64_t* timeout_ms, int* num_ready_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *num_ready_objects = message->num_ready_objects(); *timeout_ms = message->timeout(); @@ -443,11 +467,12 @@ Status SendWaitReply(int sock, const ObjectRequestMap& object_requests, return PlasmaSend(sock, MessageType_PlasmaWaitReply, &fbb, message); } -Status ReadWaitReply(uint8_t* data, ObjectRequest object_requests[], +Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[], int* num_ready_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *num_ready_objects = message->num_ready_objects(); for (int i = 0; i < *num_ready_objects; i++) { object_requests[i].object_id = @@ -475,9 +500,11 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message); } -Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port) { +Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address, + int* port) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); DCHECK(message->object_id()->size() == sizeof(ObjectID)); *object_id = ObjectID::from_binary(message->object_id()->str()); *address = strdup(message->address()->c_str()); @@ -493,10 +520,11 @@ Status SendDataReply(int sock, ObjectID object_id, int64_t object_size, return PlasmaSend(sock, MessageType_PlasmaDataReply, &fbb, message); } -Status ReadDataReply(uint8_t* data, ObjectID* object_id, int64_t* object_size, - int64_t* metadata_size) { +Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id, + int64_t* object_size, int64_t* metadata_size) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); *object_size = (int64_t)message->object_size(); *metadata_size = (int64_t)message->metadata_size(); diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h index bab08b6cbd8..af4b13978c6 100644 --- a/cpp/src/plasma/protocol.h +++ b/cpp/src/plasma/protocol.h @@ -28,6 +28,12 @@ namespace plasma { using arrow::Status; +template +bool verify_flatbuffer(T* object, uint8_t* data, size_t size) { + flatbuffers::Verifier verifier(data, size); + return object->Verify(verifier); +} + /* Plasma receive message. */ Status PlasmaReceive(int sock, int64_t message_type, std::vector* buffer); @@ -37,29 +43,31 @@ Status PlasmaReceive(int sock, int64_t message_type, std::vector* buffe Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size); -Status ReadCreateRequest(uint8_t* data, ObjectID* object_id, int64_t* data_size, - int64_t* metadata_size); +Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id, + int64_t* data_size, int64_t* metadata_size); Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error); -Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object); +Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, + PlasmaObject* object); /* Plasma Seal message functions. */ Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest); -Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest); +Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, + unsigned char* digest); Status SendSealReply(int sock, ObjectID object_id, int error); -Status ReadSealReply(uint8_t* data, ObjectID* object_id); +Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id); /* Plasma Get message functions. */ Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms); -Status ReadGetRequest(uint8_t* data, std::vector& object_ids, +Status ReadGetRequest(uint8_t* data, size_t size, std::vector& object_ids, int64_t* timeout_ms); Status SendGetReply( @@ -67,91 +75,93 @@ Status SendGetReply( std::unordered_map& plasma_objects, int64_t num_objects); -Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[], - int64_t num_objects); +Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], + PlasmaObject plasma_objects[], int64_t num_objects); /* Plasma Release message functions. */ Status SendReleaseRequest(int sock, ObjectID object_id); -Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id); +Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id); Status SendReleaseReply(int sock, ObjectID object_id, int error); -Status ReadReleaseReply(uint8_t* data, ObjectID* object_id); +Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id); /* Plasma Delete message functions. */ Status SendDeleteRequest(int sock, ObjectID object_id); -Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id); +Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id); Status SendDeleteReply(int sock, ObjectID object_id, int error); -Status ReadDeleteReply(uint8_t* data, ObjectID* object_id); +Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id); /* Satus messages. */ Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects); -Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects); +Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[], + int64_t num_objects); Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[], int64_t num_objects); -int64_t ReadStatusReply_num_objects(uint8_t* data); +int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size); -Status ReadStatusReply(uint8_t* data, ObjectID object_ids[], int object_status[], - int64_t num_objects); +Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[], + int object_status[], int64_t num_objects); /* Plasma Constains message functions. */ Status SendContainsRequest(int sock, ObjectID object_id); -Status ReadContainsRequest(uint8_t* data, ObjectID* object_id); +Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id); Status SendContainsReply(int sock, ObjectID object_id, bool has_object); -Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object); +Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, + bool* has_object); /* Plasma Connect message functions. */ Status SendConnectRequest(int sock); -Status ReadConnectRequest(uint8_t* data); +Status ReadConnectRequest(uint8_t* data, size_t size); Status SendConnectReply(int sock, int64_t memory_capacity); -Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity); +Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity); /* Plasma Evict message functions (no reply so far). */ Status SendEvictRequest(int sock, int64_t num_bytes); -Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes); +Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes); Status SendEvictReply(int sock, int64_t num_bytes); -Status ReadEvictReply(uint8_t* data, int64_t& num_bytes); +Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes); /* Plasma Fetch Remote message functions. */ Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects); -Status ReadFetchRequest(uint8_t* data, std::vector& object_ids); +Status ReadFetchRequest(uint8_t* data, size_t size, std::vector& object_ids); /* Plasma Wait message functions. */ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests, int num_ready_objects, int64_t timeout_ms); -Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests, +Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests, int64_t* timeout_ms, int* num_ready_objects); Status SendWaitReply(int sock, const ObjectRequestMap& object_requests, int num_ready_objects); -Status ReadWaitReply(uint8_t* data, ObjectRequest object_requests[], +Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[], int* num_ready_objects); /* Plasma Subscribe message functions. */ @@ -162,13 +172,14 @@ Status SendSubscribeRequest(int sock); Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port); -Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port); +Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address, + int* port); Status SendDataReply(int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size); -Status ReadDataReply(uint8_t* data, ObjectID* object_id, int64_t* object_size, - int64_t* metadata_size); +Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id, + int64_t* object_size, int64_t* metadata_size); } // namespace plasma diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 9ceecdceadc..34adc6261eb 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -553,6 +553,7 @@ Status PlasmaStore::process_message(Client* client) { ARROW_CHECK(s.ok() || s.IsIOError()); uint8_t* input = input_buffer_.data(); + size_t input_size = input_buffer_.size(); ObjectID object_id; PlasmaObject object; // TODO(pcm): Get rid of the following. @@ -563,7 +564,8 @@ Status PlasmaStore::process_message(Client* client) { case MessageType_PlasmaCreateRequest: { int64_t data_size; int64_t metadata_size; - RETURN_NOT_OK(ReadCreateRequest(input, &object_id, &data_size, &metadata_size)); + RETURN_NOT_OK( + ReadCreateRequest(input, input_size, &object_id, &data_size, &metadata_size)); int error_code = create_object(object_id, data_size, metadata_size, client, &object); HANDLE_SIGPIPE(SendCreateReply(client->fd, object_id, &object, error_code), @@ -575,15 +577,15 @@ Status PlasmaStore::process_message(Client* client) { case MessageType_PlasmaGetRequest: { std::vector object_ids_to_get; int64_t timeout_ms; - RETURN_NOT_OK(ReadGetRequest(input, object_ids_to_get, &timeout_ms)); + RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms)); process_get_request(client, object_ids_to_get, timeout_ms); } break; case MessageType_PlasmaReleaseRequest: - RETURN_NOT_OK(ReadReleaseRequest(input, &object_id)); + RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id)); release_object(object_id, client); break; case MessageType_PlasmaContainsRequest: - RETURN_NOT_OK(ReadContainsRequest(input, &object_id)); + RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id)); if (contains_object(object_id) == OBJECT_FOUND) { HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd); } else { @@ -592,13 +594,13 @@ Status PlasmaStore::process_message(Client* client) { break; case MessageType_PlasmaSealRequest: { unsigned char digest[kDigestSize]; - RETURN_NOT_OK(ReadSealRequest(input, &object_id, &digest[0])); + RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest[0])); seal_object(object_id, &digest[0]); } break; case MessageType_PlasmaEvictRequest: { // This code path should only be used for testing. int64_t num_bytes; - RETURN_NOT_OK(ReadEvictRequest(input, &num_bytes)); + RETURN_NOT_OK(ReadEvictRequest(input, input_size, &num_bytes)); std::vector objects_to_evict; int64_t num_bytes_evicted = eviction_policy_.choose_objects_to_evict(num_bytes, &objects_to_evict); @@ -688,9 +690,8 @@ int main(int argc, char* argv[]) { close(shm_fd); if (system_memory > shm_mem_avail) { ARROW_LOG(FATAL) << "System memory request exceeds memory available in /dev/shm. The " - "request is for " - << system_memory << " bytes, and the amount available is " - << shm_mem_avail + "request is for " << system_memory + << " bytes, and the amount available is " << shm_mem_avail << " bytes. You may be able to free up space by deleting files in " "/dev/shm. If you are inside a Docker container, you may need to " "pass " diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc index aca47d3d6f9..c76f5ce1092 100644 --- a/cpp/src/plasma/test/serialization_tests.cc +++ b/cpp/src/plasma/test/serialization_tests.cc @@ -82,8 +82,8 @@ TEST(PlasmaSerialization, CreateRequest) { ObjectID object_id2; int64_t data_size2; int64_t metadata_size2; - ARROW_CHECK_OK( - ReadCreateRequest(data.data(), &object_id2, &data_size2, &metadata_size2)); + ARROW_CHECK_OK(ReadCreateRequest(data.data(), data.size(), &object_id2, &data_size2, + &metadata_size2)); ASSERT_EQ(data_size1, data_size2); ASSERT_EQ(metadata_size1, metadata_size2); ASSERT_EQ(object_id1, object_id2); @@ -99,7 +99,7 @@ TEST(PlasmaSerialization, CreateReply) { ObjectID object_id2; PlasmaObject object2; memset(&object2, 0, sizeof(object2)); - ARROW_CHECK_OK(ReadCreateReply(data.data(), &object_id2, &object2)); + ARROW_CHECK_OK(ReadCreateReply(data.data(), data.size(), &object_id2, &object2)); ASSERT_EQ(object_id1, object_id2); ASSERT_EQ(memcmp(&object1, &object2, sizeof(object1)), 0); close(fd); @@ -114,7 +114,7 @@ TEST(PlasmaSerialization, SealRequest) { std::vector data = read_message_from_file(fd, MessageType_PlasmaSealRequest); ObjectID object_id2; unsigned char digest2[kDigestSize]; - ARROW_CHECK_OK(ReadSealRequest(data.data(), &object_id2, &digest2[0])); + ARROW_CHECK_OK(ReadSealRequest(data.data(), data.size(), &object_id2, &digest2[0])); ASSERT_EQ(object_id1, object_id2); ASSERT_EQ(memcmp(&digest1[0], &digest2[0], kDigestSize), 0); close(fd); @@ -126,7 +126,7 @@ TEST(PlasmaSerialization, SealReply) { ARROW_CHECK_OK(SendSealReply(fd, object_id1, PlasmaError_ObjectExists)); std::vector data = read_message_from_file(fd, MessageType_PlasmaSealReply); ObjectID object_id2; - Status s = ReadSealReply(data.data(), &object_id2); + Status s = ReadSealReply(data.data(), data.size(), &object_id2); ASSERT_EQ(object_id1, object_id2); ASSERT_TRUE(s.IsPlasmaObjectExists()); close(fd); @@ -142,7 +142,8 @@ TEST(PlasmaSerialization, GetRequest) { std::vector data = read_message_from_file(fd, MessageType_PlasmaGetRequest); std::vector object_ids_return; int64_t timeout_ms_return; - ARROW_CHECK_OK(ReadGetRequest(data.data(), object_ids_return, &timeout_ms_return)); + ARROW_CHECK_OK( + ReadGetRequest(data.data(), data.size(), object_ids_return, &timeout_ms_return)); ASSERT_EQ(object_ids[0], object_ids_return[0]); ASSERT_EQ(object_ids[1], object_ids_return[1]); ASSERT_EQ(timeout_ms, timeout_ms_return); @@ -162,8 +163,8 @@ TEST(PlasmaSerialization, GetReply) { ObjectID object_ids_return[2]; PlasmaObject plasma_objects_return[2]; memset(&plasma_objects_return, 0, sizeof(plasma_objects_return)); - ARROW_CHECK_OK( - ReadGetReply(data.data(), object_ids_return, &plasma_objects_return[0], 2)); + ARROW_CHECK_OK(ReadGetReply(data.data(), data.size(), object_ids_return, + &plasma_objects_return[0], 2)); ASSERT_EQ(object_ids[0], object_ids_return[0]); ASSERT_EQ(object_ids[1], object_ids_return[1]); ASSERT_EQ(memcmp(&plasma_objects[object_ids[0]], &plasma_objects_return[0], @@ -182,7 +183,7 @@ TEST(PlasmaSerialization, ReleaseRequest) { std::vector data = read_message_from_file(fd, MessageType_PlasmaReleaseRequest); ObjectID object_id2; - ARROW_CHECK_OK(ReadReleaseRequest(data.data(), &object_id2)); + ARROW_CHECK_OK(ReadReleaseRequest(data.data(), data.size(), &object_id2)); ASSERT_EQ(object_id1, object_id2); close(fd); } @@ -193,7 +194,7 @@ TEST(PlasmaSerialization, ReleaseReply) { ARROW_CHECK_OK(SendReleaseReply(fd, object_id1, PlasmaError_ObjectExists)); std::vector data = read_message_from_file(fd, MessageType_PlasmaReleaseReply); ObjectID object_id2; - Status s = ReadReleaseReply(data.data(), &object_id2); + Status s = ReadReleaseReply(data.data(), data.size(), &object_id2); ASSERT_EQ(object_id1, object_id2); ASSERT_TRUE(s.IsPlasmaObjectExists()); close(fd); @@ -205,7 +206,7 @@ TEST(PlasmaSerialization, DeleteRequest) { ARROW_CHECK_OK(SendDeleteRequest(fd, object_id1)); std::vector data = read_message_from_file(fd, MessageType_PlasmaDeleteRequest); ObjectID object_id2; - ARROW_CHECK_OK(ReadDeleteRequest(data.data(), &object_id2)); + ARROW_CHECK_OK(ReadDeleteRequest(data.data(), data.size(), &object_id2)); ASSERT_EQ(object_id1, object_id2); close(fd); } @@ -217,7 +218,7 @@ TEST(PlasmaSerialization, DeleteReply) { ARROW_CHECK_OK(SendDeleteReply(fd, object_id1, error1)); std::vector data = read_message_from_file(fd, MessageType_PlasmaDeleteReply); ObjectID object_id2; - Status s = ReadDeleteReply(data.data(), &object_id2); + Status s = ReadDeleteReply(data.data(), data.size(), &object_id2); ASSERT_EQ(object_id1, object_id2); ASSERT_TRUE(s.IsPlasmaObjectExists()); close(fd); @@ -232,7 +233,8 @@ TEST(PlasmaSerialization, StatusRequest) { ARROW_CHECK_OK(SendStatusRequest(fd, object_ids, num_objects)); std::vector data = read_message_from_file(fd, MessageType_PlasmaStatusRequest); ObjectID object_ids_read[num_objects]; - ARROW_CHECK_OK(ReadStatusRequest(data.data(), object_ids_read, num_objects)); + ARROW_CHECK_OK( + ReadStatusRequest(data.data(), data.size(), object_ids_read, num_objects)); ASSERT_EQ(object_ids[0], object_ids_read[0]); ASSERT_EQ(object_ids[1], object_ids_read[1]); close(fd); @@ -246,11 +248,11 @@ TEST(PlasmaSerialization, StatusReply) { int object_statuses[2] = {42, 43}; ARROW_CHECK_OK(SendStatusReply(fd, object_ids, object_statuses, 2)); std::vector data = read_message_from_file(fd, MessageType_PlasmaStatusReply); - int64_t num_objects = ReadStatusReply_num_objects(data.data()); + int64_t num_objects = ReadStatusReply_num_objects(data.data(), data.size()); ObjectID object_ids_read[num_objects]; int object_statuses_read[num_objects]; - ARROW_CHECK_OK( - ReadStatusReply(data.data(), object_ids_read, object_statuses_read, num_objects)); + ARROW_CHECK_OK(ReadStatusReply(data.data(), data.size(), object_ids_read, + object_statuses_read, num_objects)); ASSERT_EQ(object_ids[0], object_ids_read[0]); ASSERT_EQ(object_ids[1], object_ids_read[1]); ASSERT_EQ(object_statuses[0], object_statuses_read[0]); @@ -264,7 +266,7 @@ TEST(PlasmaSerialization, EvictRequest) { ARROW_CHECK_OK(SendEvictRequest(fd, num_bytes)); std::vector data = read_message_from_file(fd, MessageType_PlasmaEvictRequest); int64_t num_bytes_received; - ARROW_CHECK_OK(ReadEvictRequest(data.data(), &num_bytes_received)); + ARROW_CHECK_OK(ReadEvictRequest(data.data(), data.size(), &num_bytes_received)); ASSERT_EQ(num_bytes, num_bytes_received); close(fd); } @@ -275,7 +277,7 @@ TEST(PlasmaSerialization, EvictReply) { ARROW_CHECK_OK(SendEvictReply(fd, num_bytes)); std::vector data = read_message_from_file(fd, MessageType_PlasmaEvictReply); int64_t num_bytes_received; - ARROW_CHECK_OK(ReadEvictReply(data.data(), num_bytes_received)); + ARROW_CHECK_OK(ReadEvictReply(data.data(), data.size(), num_bytes_received)); ASSERT_EQ(num_bytes, num_bytes_received); close(fd); } @@ -288,7 +290,7 @@ TEST(PlasmaSerialization, FetchRequest) { ARROW_CHECK_OK(SendFetchRequest(fd, object_ids, 2)); std::vector data = read_message_from_file(fd, MessageType_PlasmaFetchRequest); std::vector object_ids_read; - ARROW_CHECK_OK(ReadFetchRequest(data.data(), object_ids_read)); + ARROW_CHECK_OK(ReadFetchRequest(data.data(), data.size(), object_ids_read)); ASSERT_EQ(object_ids[0], object_ids_read[0]); ASSERT_EQ(object_ids[1], object_ids_read[1]); close(fd); @@ -310,8 +312,8 @@ TEST(PlasmaSerialization, WaitRequest) { int num_ready_objects_out; int64_t timeout_ms_read; ObjectRequestMap object_requests_out; - ARROW_CHECK_OK(ReadWaitRequest(data.data(), object_requests_out, &timeout_ms_read, - &num_ready_objects_out)); + ARROW_CHECK_OK(ReadWaitRequest(data.data(), data.size(), object_requests_out, + &timeout_ms_read, &num_ready_objects_out)); ASSERT_EQ(num_objects_in, object_requests_out.size()); ASSERT_EQ(num_ready_objects_out, num_ready_objects_in); for (int i = 0; i < num_objects_in; i++) { @@ -340,7 +342,8 @@ TEST(PlasmaSerialization, WaitReply) { std::vector data = read_message_from_file(fd, MessageType_PlasmaWaitReply); ObjectRequest objects_out[2]; int num_objects_out; - ARROW_CHECK_OK(ReadWaitReply(data.data(), &objects_out[0], &num_objects_out)); + ARROW_CHECK_OK( + ReadWaitReply(data.data(), data.size(), &objects_out[0], &num_objects_out)); ASSERT_EQ(num_objects_in, num_objects_out); for (int i = 0; i < num_objects_out; i++) { /* Each object request must appear exactly once. */ @@ -364,7 +367,8 @@ TEST(PlasmaSerialization, DataRequest) { ObjectID object_id2; char* address2; int port2; - ARROW_CHECK_OK(ReadDataRequest(data.data(), &object_id2, &address2, &port2)); + ARROW_CHECK_OK( + ReadDataRequest(data.data(), data.size(), &object_id2, &address2, &port2)); ASSERT_EQ(object_id1, object_id2); ASSERT_EQ(strcmp(address1, address2), 0); ASSERT_EQ(port1, port2); @@ -383,7 +387,8 @@ TEST(PlasmaSerialization, DataReply) { ObjectID object_id2; int64_t object_size2; int64_t metadata_size2; - ARROW_CHECK_OK(ReadDataReply(data.data(), &object_id2, &object_size2, &metadata_size2)); + ARROW_CHECK_OK(ReadDataReply(data.data(), data.size(), &object_id2, &object_size2, + &metadata_size2)); ASSERT_EQ(object_id1, object_id2); ASSERT_EQ(object_size1, object_size2); ASSERT_EQ(metadata_size1, metadata_size2);