From ebae611b3554b7cd365e767780c66f2d3941924d Mon Sep 17 00:00:00 2001 From: Yeolar Date: Tue, 25 Jul 2017 15:20:16 +0800 Subject: [PATCH 1/3] Fix typo in plasma protocol; add DCHECK for ReadXXX in plasma protocol. --- cpp/src/plasma/client.cc | 14 ++--- cpp/src/plasma/protocol.cc | 71 ++++++++++++++-------- cpp/src/plasma/protocol.h | 54 ++++++++-------- cpp/src/plasma/store.cc | 13 ++-- cpp/src/plasma/test/serialization_tests.cc | 38 ++++++------ 5 files changed, 110 insertions(+), 80 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index dcb78e7ec52..b7b0ff0a309 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -130,7 +130,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer)); ObjectID id; PlasmaObject object; - RETURN_NOT_OK(ReadCreateReply(buffer.data(), &id, &object)); + RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &object)); // If the CreateReply included an error, then the store will not send a file // descriptor. int fd = recv_fd(store_conn_); @@ -204,7 +204,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, std::vector object_data(num_objects); PlasmaObject* object; RETURN_NOT_OK(ReadGetReply( - buffer.data(), received_object_ids.data(), object_data.data(), num_objects)); + buffer.data(), buffer.size(), received_object_ids.data(), object_data.data(), num_objects)); for (int i = 0; i < num_objects; ++i) { DCHECK(received_object_ids[i] == object_ids[i]); @@ -328,7 +328,7 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) { std::vector buffer; RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, &buffer)); ObjectID object_id2; - RETURN_NOT_OK(ReadContainsReply(buffer.data(), &object_id2, has_object)); + RETURN_NOT_OK(ReadContainsReply(buffer.data(), buffer.size(), &object_id2, has_object)); } return Status::OK(); } @@ -436,7 +436,7 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { std::vector buffer; int64_t type; RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer)); - return ReadEvictReply(buffer.data(), num_bytes_evicted); + return ReadEvictReply(buffer.data(), buffer.size(), num_bytes_evicted); } Status PlasmaClient::Subscribe(int* fd) { @@ -473,7 +473,7 @@ Status PlasmaClient::Connect(const std::string& store_socket_name, RETURN_NOT_OK(SendConnectRequest(store_conn_)); std::vector buffer; RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaConnectReply, &buffer)); - RETURN_NOT_OK(ReadConnectReply(buffer.data(), &store_capacity_)); + RETURN_NOT_OK(ReadConnectReply(buffer.data(), buffer.size(), &store_capacity_)); return Status::OK(); } @@ -511,7 +511,7 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) { std::vector buffer; RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaStatusReply, &buffer)); ObjectID id; - RETURN_NOT_OK(ReadStatusReply(buffer.data(), &id, object_status, 1)); + RETURN_NOT_OK(ReadStatusReply(buffer.data(), buffer.size(), &id, object_status, 1)); ARROW_CHECK(object_id == id); return Status::OK(); } @@ -532,7 +532,7 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req num_ready_objects, timeout_ms)); std::vector buffer; RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer)); - RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, &num_ready_objects)); + RETURN_NOT_OK(ReadWaitReply(buffer.data(), buffer.size(), object_requests, &num_ready_objects)); *num_objects_ready = 0; for (int i = 0; i < num_object_requests; ++i) { diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc index 246aa297360..c51475dea29 100644 --- a/cpp/src/plasma/protocol.cc +++ b/cpp/src/plasma/protocol.cc @@ -61,9 +61,10 @@ Status SendCreateRequest( } Status ReadCreateRequest( - uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) { + uint8_t* data, size_t size, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *data_size = message->data_size(); *metadata_size = message->metadata_size(); *object_id = ObjectID::from_binary(message->object_id()->str()); @@ -81,9 +82,10 @@ Status SendCreateReply( return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message); } -Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object) { +Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, PlasmaObject* object) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); object->handle.store_fd = message->plasma_object()->segment_index(); object->handle.mmap_size = message->plasma_object()->mmap_size(); @@ -104,9 +106,10 @@ Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) { return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message); } -Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest) { +Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, unsigned char* digest) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); ARROW_CHECK(message->digest()->size() == kDigestSize); memcpy(digest, message->digest()->data(), kDigestSize); @@ -120,9 +123,10 @@ Status SendSealReply(int sock, ObjectID object_id, int error) { return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message); } -Status ReadSealReply(uint8_t* data, ObjectID* object_id) { +Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); return plasma_error_status(message->error()); } @@ -131,13 +135,14 @@ Status ReadSealReply(uint8_t* data, ObjectID* object_id) { Status SendReleaseRequest(int sock, ObjectID object_id) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary())); + auto message = CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary())); return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message); } -Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) { +Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); return Status::OK(); } @@ -149,9 +154,10 @@ Status SendReleaseReply(int sock, ObjectID object_id, int error) { return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message); } -Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) { +Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); return plasma_error_status(message->error()); } @@ -164,9 +170,10 @@ Status SendDeleteRequest(int sock, ObjectID object_id) { return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message); } -Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) { +Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); return Status::OK(); } @@ -178,9 +185,10 @@ Status SendDeleteReply(int sock, ObjectID object_id, int error) { return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message); } -Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) { +Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); return plasma_error_status(message->error()); } @@ -194,9 +202,10 @@ Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objec return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message); } -Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects) { +Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[], int64_t num_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); for (uoffset_t i = 0; i < num_objects; ++i) { object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str()); } @@ -212,16 +221,18 @@ Status SendStatusReply( return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message); } -int64_t ReadStatusReply_num_objects(uint8_t* data) { +int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); return message->object_ids()->size(); } Status ReadStatusReply( - uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects) { + uint8_t* data, size_t size, ObjectID object_ids[], int object_status[], int64_t num_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); for (uoffset_t i = 0; i < num_objects; ++i) { object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str()); } @@ -239,9 +250,10 @@ Status SendContainsRequest(int sock, ObjectID object_id) { return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message); } -Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) { +Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); return Status::OK(); } @@ -253,9 +265,10 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object) { return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message); } -Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) { +Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, bool* has_object) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); *has_object = message->has_object(); return Status::OK(); @@ -279,9 +292,10 @@ Status SendConnectReply(int sock, int64_t memory_capacity) { return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message); } -Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) { +Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *memory_capacity = message->memory_capacity(); return Status::OK(); } @@ -294,9 +308,10 @@ Status SendEvictRequest(int sock, int64_t num_bytes) { return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message); } -Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) { +Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *num_bytes = message->num_bytes(); return Status::OK(); } @@ -307,9 +322,10 @@ Status SendEvictReply(int sock, int64_t num_bytes) { return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message); } -Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) { +Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); num_bytes = message->num_bytes(); return Status::OK(); } @@ -325,9 +341,10 @@ Status SendGetRequest( } Status ReadGetRequest( - uint8_t* data, std::vector& object_ids, int64_t* timeout_ms) { + uint8_t* data, size_t size, std::vector& object_ids, int64_t* timeout_ms) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { auto object_id = message->object_ids()->Get(i)->str(); object_ids.push_back(ObjectID::from_binary(object_id)); @@ -353,10 +370,11 @@ Status SendGetReply(int sock, ObjectID object_ids[], return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message); } -Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[], +Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], PlasmaObject plasma_objects[], int64_t num_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); for (uoffset_t i = 0; i < num_objects; ++i) { object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str()); } @@ -381,9 +399,10 @@ Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_object return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message); } -Status ReadFetchRequest(uint8_t* data, std::vector& object_ids) { +Status ReadFetchRequest(uint8_t* data, size_t size, std::vector& object_ids) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str())); } @@ -408,10 +427,11 @@ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_re return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message); } -Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests, +Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests, int64_t* timeout_ms, int* num_ready_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *num_ready_objects = message->num_ready_objects(); *timeout_ms = message->timeout(); @@ -442,10 +462,11 @@ Status SendWaitReply( } Status ReadWaitReply( - uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects) { + uint8_t* data, size_t size, ObjectRequest object_requests[], int* num_ready_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *num_ready_objects = message->num_ready_objects(); for (int i = 0; i < *num_ready_objects; i++) { object_requests[i].object_id = @@ -473,9 +494,10 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message); } -Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port) { +Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address, int* port) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); DCHECK(message->object_id()->size() == sizeof(ObjectID)); *object_id = ObjectID::from_binary(message->object_id()->str()); *address = strdup(message->address()->c_str()); @@ -492,9 +514,10 @@ Status SendDataReply( } Status ReadDataReply( - uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size) { + uint8_t* data, size_t size, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size) { DCHECK(data); auto message = flatbuffers::GetRoot(data); + DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); *object_size = (int64_t)message->object_size(); *metadata_size = (int64_t)message->metadata_size(); diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h index 5d9d1367514..954a0ae8e18 100644 --- a/cpp/src/plasma/protocol.h +++ b/cpp/src/plasma/protocol.h @@ -26,6 +26,12 @@ using arrow::Status; +template +bool verify_flatbuffer(T* object, uint8_t* data, size_t size) { + flatbuffers::Verifier verifier(data, size); + return object->Verify(verifier); +} + /* Plasma receive message. */ Status PlasmaReceive(int sock, int64_t message_type, std::vector* buffer); @@ -36,21 +42,21 @@ Status SendCreateRequest( int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size); Status ReadCreateRequest( - uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size); + uint8_t* data, size_t size, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size); Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error); -Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object); +Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, PlasmaObject* object); /* Plasma Seal message functions. */ Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest); -Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest); +Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, unsigned char* digest); Status SendSealReply(int sock, ObjectID object_id, int error); -Status ReadSealReply(uint8_t* data, ObjectID* object_id); +Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id); /* Plasma Get message functions. */ @@ -58,98 +64,98 @@ Status SendGetRequest( int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms); Status ReadGetRequest( - uint8_t* data, std::vector& object_ids, int64_t* timeout_ms); + uint8_t* data, size_t size, std::vector& object_ids, int64_t* timeout_ms); Status SendGetReply(int sock, ObjectID object_ids[], std::unordered_map& plasma_objects, int64_t num_objects); -Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[], +Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], PlasmaObject plasma_objects[], int64_t num_objects); /* Plasma Release message functions. */ Status SendReleaseRequest(int sock, ObjectID object_id); -Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id); +Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id); Status SendReleaseReply(int sock, ObjectID object_id, int error); -Status ReadReleaseReply(uint8_t* data, ObjectID* object_id); +Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id); /* Plasma Delete message functions. */ Status SendDeleteRequest(int sock, ObjectID object_id); -Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id); +Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id); Status SendDeleteReply(int sock, ObjectID object_id, int error); -Status ReadDeleteReply(uint8_t* data, ObjectID* object_id); +Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id); /* Satus messages. */ Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects); -Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects); +Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[], int64_t num_objects); Status SendStatusReply( int sock, ObjectID object_ids[], int object_status[], int64_t num_objects); -int64_t ReadStatusReply_num_objects(uint8_t* data); +int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size); Status ReadStatusReply( - uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects); + uint8_t* data, size_t size, ObjectID object_ids[], int object_status[], int64_t num_objects); /* Plasma Constains message functions. */ Status SendContainsRequest(int sock, ObjectID object_id); -Status ReadContainsRequest(uint8_t* data, ObjectID* object_id); +Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id); Status SendContainsReply(int sock, ObjectID object_id, bool has_object); -Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object); +Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, bool* has_object); /* Plasma Connect message functions. */ Status SendConnectRequest(int sock); -Status ReadConnectRequest(uint8_t* data); +Status ReadConnectRequest(uint8_t* data, size_t size); Status SendConnectReply(int sock, int64_t memory_capacity); -Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity); +Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity); /* Plasma Evict message functions (no reply so far). */ Status SendEvictRequest(int sock, int64_t num_bytes); -Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes); +Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes); Status SendEvictReply(int sock, int64_t num_bytes); -Status ReadEvictReply(uint8_t* data, int64_t& num_bytes); +Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes); /* Plasma Fetch Remote message functions. */ Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects); -Status ReadFetchRequest(uint8_t* data, std::vector& object_ids); +Status ReadFetchRequest(uint8_t* data, size_t size, std::vector& object_ids); /* Plasma Wait message functions. */ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests, int num_ready_objects, int64_t timeout_ms); -Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests, +Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests, int64_t* timeout_ms, int* num_ready_objects); Status SendWaitReply( int sock, const ObjectRequestMap& object_requests, int num_ready_objects); Status ReadWaitReply( - uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects); + uint8_t* data, size_t size, ObjectRequest object_requests[], int* num_ready_objects); /* Plasma Subscribe message functions. */ @@ -159,12 +165,12 @@ Status SendSubscribeRequest(int sock); Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port); -Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port); +Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address, int* port); Status SendDataReply( int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size); Status ReadDataReply( - uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size); + uint8_t* data, size_t size, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size); #endif /* PLASMA_PROTOCOL */ diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 9394e3de310..5285ba1c4c5 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -535,6 +535,7 @@ Status PlasmaStore::process_message(Client* client) { ARROW_CHECK(s.ok() || s.IsIOError()); uint8_t* input = input_buffer_.data(); + size_t input_size = input_buffer_.size(); ObjectID object_id; PlasmaObject object; // TODO(pcm): Get rid of the following. @@ -545,7 +546,7 @@ Status PlasmaStore::process_message(Client* client) { case MessageType_PlasmaCreateRequest: { int64_t data_size; int64_t metadata_size; - RETURN_NOT_OK(ReadCreateRequest(input, &object_id, &data_size, &metadata_size)); + RETURN_NOT_OK(ReadCreateRequest(input, input_size, &object_id, &data_size, &metadata_size)); int error_code = create_object(object_id, data_size, metadata_size, client, &object); HANDLE_SIGPIPE( @@ -557,15 +558,15 @@ Status PlasmaStore::process_message(Client* client) { case MessageType_PlasmaGetRequest: { std::vector object_ids_to_get; int64_t timeout_ms; - RETURN_NOT_OK(ReadGetRequest(input, object_ids_to_get, &timeout_ms)); + RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms)); process_get_request(client, object_ids_to_get, timeout_ms); } break; case MessageType_PlasmaReleaseRequest: - RETURN_NOT_OK(ReadReleaseRequest(input, &object_id)); + RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id)); release_object(object_id, client); break; case MessageType_PlasmaContainsRequest: - RETURN_NOT_OK(ReadContainsRequest(input, &object_id)); + RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id)); if (contains_object(object_id) == OBJECT_FOUND) { HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd); } else { @@ -574,13 +575,13 @@ Status PlasmaStore::process_message(Client* client) { break; case MessageType_PlasmaSealRequest: { unsigned char digest[kDigestSize]; - RETURN_NOT_OK(ReadSealRequest(input, &object_id, &digest[0])); + RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest[0])); seal_object(object_id, &digest[0]); } break; case MessageType_PlasmaEvictRequest: { // This code path should only be used for testing. int64_t num_bytes; - RETURN_NOT_OK(ReadEvictRequest(input, &num_bytes)); + RETURN_NOT_OK(ReadEvictRequest(input, input_size, &num_bytes)); std::vector objects_to_evict; int64_t num_bytes_evicted = eviction_policy_.choose_objects_to_evict(num_bytes, &objects_to_evict); diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc index 325cead06e7..22916a246e4 100644 --- a/cpp/src/plasma/test/serialization_tests.cc +++ b/cpp/src/plasma/test/serialization_tests.cc @@ -81,7 +81,7 @@ TEST(PlasmaSerialization, CreateRequest) { int64_t data_size2; int64_t metadata_size2; ARROW_CHECK_OK( - ReadCreateRequest(data.data(), &object_id2, &data_size2, &metadata_size2)); + ReadCreateRequest(data.data(), data.size(), &object_id2, &data_size2, &metadata_size2)); ASSERT_EQ(data_size1, data_size2); ASSERT_EQ(metadata_size1, metadata_size2); ASSERT_EQ(object_id1, object_id2); @@ -97,7 +97,7 @@ TEST(PlasmaSerialization, CreateReply) { ObjectID object_id2; PlasmaObject object2; memset(&object2, 0, sizeof(object2)); - ARROW_CHECK_OK(ReadCreateReply(data.data(), &object_id2, &object2)); + ARROW_CHECK_OK(ReadCreateReply(data.data(), data.size(), &object_id2, &object2)); ASSERT_EQ(object_id1, object_id2); ASSERT_EQ(memcmp(&object1, &object2, sizeof(object1)), 0); close(fd); @@ -112,7 +112,7 @@ TEST(PlasmaSerialization, SealRequest) { std::vector data = read_message_from_file(fd, MessageType_PlasmaSealRequest); ObjectID object_id2; unsigned char digest2[kDigestSize]; - ARROW_CHECK_OK(ReadSealRequest(data.data(), &object_id2, &digest2[0])); + ARROW_CHECK_OK(ReadSealRequest(data.data(), data.size(), &object_id2, &digest2[0])); ASSERT_EQ(object_id1, object_id2); ASSERT_EQ(memcmp(&digest1[0], &digest2[0], kDigestSize), 0); close(fd); @@ -124,7 +124,7 @@ TEST(PlasmaSerialization, SealReply) { ARROW_CHECK_OK(SendSealReply(fd, object_id1, PlasmaError_ObjectExists)); std::vector data = read_message_from_file(fd, MessageType_PlasmaSealReply); ObjectID object_id2; - Status s = ReadSealReply(data.data(), &object_id2); + Status s = ReadSealReply(data.data(), data.size(), &object_id2); ASSERT_EQ(object_id1, object_id2); ASSERT_TRUE(s.IsPlasmaObjectExists()); close(fd); @@ -140,7 +140,7 @@ TEST(PlasmaSerialization, GetRequest) { std::vector data = read_message_from_file(fd, MessageType_PlasmaGetRequest); std::vector object_ids_return; int64_t timeout_ms_return; - ARROW_CHECK_OK(ReadGetRequest(data.data(), object_ids_return, &timeout_ms_return)); + ARROW_CHECK_OK(ReadGetRequest(data.data(), data.size(), object_ids_return, &timeout_ms_return)); ASSERT_EQ(object_ids[0], object_ids_return[0]); ASSERT_EQ(object_ids[1], object_ids_return[1]); ASSERT_EQ(timeout_ms, timeout_ms_return); @@ -161,7 +161,7 @@ TEST(PlasmaSerialization, GetReply) { PlasmaObject plasma_objects_return[2]; memset(&plasma_objects_return, 0, sizeof(plasma_objects_return)); ARROW_CHECK_OK( - ReadGetReply(data.data(), object_ids_return, &plasma_objects_return[0], 2)); + ReadGetReply(data.data(), data.size(), object_ids_return, &plasma_objects_return[0], 2)); ASSERT_EQ(object_ids[0], object_ids_return[0]); ASSERT_EQ(object_ids[1], object_ids_return[1]); ASSERT_EQ(memcmp(&plasma_objects[object_ids[0]], &plasma_objects_return[0], @@ -180,7 +180,7 @@ TEST(PlasmaSerialization, ReleaseRequest) { std::vector data = read_message_from_file(fd, MessageType_PlasmaReleaseRequest); ObjectID object_id2; - ARROW_CHECK_OK(ReadReleaseRequest(data.data(), &object_id2)); + ARROW_CHECK_OK(ReadReleaseRequest(data.data(), data.size(), &object_id2)); ASSERT_EQ(object_id1, object_id2); close(fd); } @@ -191,7 +191,7 @@ TEST(PlasmaSerialization, ReleaseReply) { ARROW_CHECK_OK(SendReleaseReply(fd, object_id1, PlasmaError_ObjectExists)); std::vector data = read_message_from_file(fd, MessageType_PlasmaReleaseReply); ObjectID object_id2; - Status s = ReadReleaseReply(data.data(), &object_id2); + Status s = ReadReleaseReply(data.data(), data.size(), &object_id2); ASSERT_EQ(object_id1, object_id2); ASSERT_TRUE(s.IsPlasmaObjectExists()); close(fd); @@ -203,7 +203,7 @@ TEST(PlasmaSerialization, DeleteRequest) { ARROW_CHECK_OK(SendDeleteRequest(fd, object_id1)); std::vector data = read_message_from_file(fd, MessageType_PlasmaDeleteRequest); ObjectID object_id2; - ARROW_CHECK_OK(ReadDeleteRequest(data.data(), &object_id2)); + ARROW_CHECK_OK(ReadDeleteRequest(data.data(), data.size(), &object_id2)); ASSERT_EQ(object_id1, object_id2); close(fd); } @@ -215,7 +215,7 @@ TEST(PlasmaSerialization, DeleteReply) { ARROW_CHECK_OK(SendDeleteReply(fd, object_id1, error1)); std::vector data = read_message_from_file(fd, MessageType_PlasmaDeleteReply); ObjectID object_id2; - Status s = ReadDeleteReply(data.data(), &object_id2); + Status s = ReadDeleteReply(data.data(), data.size(), &object_id2); ASSERT_EQ(object_id1, object_id2); ASSERT_TRUE(s.IsPlasmaObjectExists()); close(fd); @@ -230,7 +230,7 @@ TEST(PlasmaSerialization, StatusRequest) { ARROW_CHECK_OK(SendStatusRequest(fd, object_ids, num_objects)); std::vector data = read_message_from_file(fd, MessageType_PlasmaStatusRequest); ObjectID object_ids_read[num_objects]; - ARROW_CHECK_OK(ReadStatusRequest(data.data(), object_ids_read, num_objects)); + ARROW_CHECK_OK(ReadStatusRequest(data.data(), data.size(), object_ids_read, num_objects)); ASSERT_EQ(object_ids[0], object_ids_read[0]); ASSERT_EQ(object_ids[1], object_ids_read[1]); close(fd); @@ -248,7 +248,7 @@ TEST(PlasmaSerialization, StatusReply) { ObjectID object_ids_read[num_objects]; int object_statuses_read[num_objects]; ARROW_CHECK_OK( - ReadStatusReply(data.data(), object_ids_read, object_statuses_read, num_objects)); + ReadStatusReply(data.data(), data.size(), object_ids_read, object_statuses_read, num_objects)); ASSERT_EQ(object_ids[0], object_ids_read[0]); ASSERT_EQ(object_ids[1], object_ids_read[1]); ASSERT_EQ(object_statuses[0], object_statuses_read[0]); @@ -262,7 +262,7 @@ TEST(PlasmaSerialization, EvictRequest) { ARROW_CHECK_OK(SendEvictRequest(fd, num_bytes)); std::vector data = read_message_from_file(fd, MessageType_PlasmaEvictRequest); int64_t num_bytes_received; - ARROW_CHECK_OK(ReadEvictRequest(data.data(), &num_bytes_received)); + ARROW_CHECK_OK(ReadEvictRequest(data.data(), data.size(), &num_bytes_received)); ASSERT_EQ(num_bytes, num_bytes_received); close(fd); } @@ -273,7 +273,7 @@ TEST(PlasmaSerialization, EvictReply) { ARROW_CHECK_OK(SendEvictReply(fd, num_bytes)); std::vector data = read_message_from_file(fd, MessageType_PlasmaEvictReply); int64_t num_bytes_received; - ARROW_CHECK_OK(ReadEvictReply(data.data(), num_bytes_received)); + ARROW_CHECK_OK(ReadEvictReply(data.data(), data.size(), num_bytes_received)); ASSERT_EQ(num_bytes, num_bytes_received); close(fd); } @@ -286,7 +286,7 @@ TEST(PlasmaSerialization, FetchRequest) { ARROW_CHECK_OK(SendFetchRequest(fd, object_ids, 2)); std::vector data = read_message_from_file(fd, MessageType_PlasmaFetchRequest); std::vector object_ids_read; - ARROW_CHECK_OK(ReadFetchRequest(data.data(), object_ids_read)); + ARROW_CHECK_OK(ReadFetchRequest(data.data(), data.size(), object_ids_read)); ASSERT_EQ(object_ids[0], object_ids_read[0]); ASSERT_EQ(object_ids[1], object_ids_read[1]); close(fd); @@ -309,7 +309,7 @@ TEST(PlasmaSerialization, WaitRequest) { int64_t timeout_ms_read; ObjectRequestMap object_requests_out; ARROW_CHECK_OK(ReadWaitRequest( - data.data(), object_requests_out, &timeout_ms_read, &num_ready_objects_out)); + data.data(), data.size(), object_requests_out, &timeout_ms_read, &num_ready_objects_out)); ASSERT_EQ(num_objects_in, object_requests_out.size()); ASSERT_EQ(num_ready_objects_out, num_ready_objects_in); for (int i = 0; i < num_objects_in; i++) { @@ -338,7 +338,7 @@ TEST(PlasmaSerialization, WaitReply) { std::vector data = read_message_from_file(fd, MessageType_PlasmaWaitReply); ObjectRequest objects_out[2]; int num_objects_out; - ARROW_CHECK_OK(ReadWaitReply(data.data(), &objects_out[0], &num_objects_out)); + ARROW_CHECK_OK(ReadWaitReply(data.data(), data.size(), &objects_out[0], &num_objects_out)); ASSERT_EQ(num_objects_in, num_objects_out); for (int i = 0; i < num_objects_out; i++) { /* Each object request must appear exactly once. */ @@ -362,7 +362,7 @@ TEST(PlasmaSerialization, DataRequest) { ObjectID object_id2; char* address2; int port2; - ARROW_CHECK_OK(ReadDataRequest(data.data(), &object_id2, &address2, &port2)); + ARROW_CHECK_OK(ReadDataRequest(data.data(), data.size(), &object_id2, &address2, &port2)); ASSERT_EQ(object_id1, object_id2); ASSERT_EQ(strcmp(address1, address2), 0); ASSERT_EQ(port1, port2); @@ -381,7 +381,7 @@ TEST(PlasmaSerialization, DataReply) { ObjectID object_id2; int64_t object_size2; int64_t metadata_size2; - ARROW_CHECK_OK(ReadDataReply(data.data(), &object_id2, &object_size2, &metadata_size2)); + ARROW_CHECK_OK(ReadDataReply(data.data(), data.size(), &object_id2, &object_size2, &metadata_size2)); ASSERT_EQ(object_id1, object_id2); ASSERT_EQ(object_size1, object_size2); ASSERT_EQ(metadata_size1, metadata_size2); From 143d2546efe7440a21f40b688e7931825ec858db Mon Sep 17 00:00:00 2001 From: Yeolar Date: Tue, 25 Jul 2017 23:56:46 +0800 Subject: [PATCH 2/3] Update, compile passed. --- cpp/src/plasma/test/serialization_tests.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc index 7fa8531e699..65627954b8e 100644 --- a/cpp/src/plasma/test/serialization_tests.cc +++ b/cpp/src/plasma/test/serialization_tests.cc @@ -246,7 +246,7 @@ TEST(PlasmaSerialization, StatusReply) { int object_statuses[2] = {42, 43}; ARROW_CHECK_OK(SendStatusReply(fd, object_ids, object_statuses, 2)); std::vector data = read_message_from_file(fd, MessageType_PlasmaStatusReply); - int64_t num_objects = ReadStatusReply_num_objects(data.data()); + int64_t num_objects = ReadStatusReply_num_objects(data.data(), data.size()); ObjectID object_ids_read[num_objects]; int object_statuses_read[num_objects]; ARROW_CHECK_OK( From 4df63bcbfbd8217b671e3f88f1bb2fcdedf49d65 Mon Sep 17 00:00:00 2001 From: Yeolar Date: Wed, 26 Jul 2017 10:38:00 +0800 Subject: [PATCH 3/3] clang-format for too long lines. --- cpp/src/plasma/client.cc | 11 ++++---- cpp/src/plasma/protocol.cc | 28 ++++++++++--------- cpp/src/plasma/protocol.h | 31 ++++++++++++---------- cpp/src/plasma/store.cc | 8 +++--- cpp/src/plasma/test/serialization_tests.cc | 27 +++++++++++-------- 5 files changed, 59 insertions(+), 46 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 44b94d6c5cf..e14b3d9a46c 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -227,9 +227,8 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, std::vector received_object_ids(num_objects); std::vector object_data(num_objects); PlasmaObject* object; - RETURN_NOT_OK(ReadGetReply(buffer.data(), buffer.size(), - received_object_ids.data(), object_data.data(), - num_objects)); + RETURN_NOT_OK(ReadGetReply(buffer.data(), buffer.size(), received_object_ids.data(), + object_data.data(), num_objects)); for (int i = 0; i < num_objects; ++i) { DCHECK(received_object_ids[i] == object_ids[i]); @@ -357,7 +356,8 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) { std::vector buffer; RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, &buffer)); ObjectID object_id2; - RETURN_NOT_OK(ReadContainsReply(buffer.data(), buffer.size(), &object_id2, has_object)); + RETURN_NOT_OK( + ReadContainsReply(buffer.data(), buffer.size(), &object_id2, has_object)); } return Status::OK(); } @@ -587,7 +587,8 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req num_ready_objects, timeout_ms)); std::vector buffer; RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer)); - RETURN_NOT_OK(ReadWaitReply(buffer.data(), buffer.size(), object_requests, &num_ready_objects)); + RETURN_NOT_OK( + ReadWaitReply(buffer.data(), buffer.size(), object_requests, &num_ready_objects)); *num_objects_ready = 0; for (int i = 0; i < num_object_requests; ++i) { diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc index 91a279d5d14..77bc8b7aae3 100644 --- a/cpp/src/plasma/protocol.cc +++ b/cpp/src/plasma/protocol.cc @@ -84,7 +84,8 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message); } -Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, PlasmaObject* object) { +Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, + PlasmaObject* object) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(verify_flatbuffer(message, data, size)); @@ -108,7 +109,8 @@ Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) { return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message); } -Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, unsigned char* digest) { +Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, + unsigned char* digest) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(verify_flatbuffer(message, data, size)); @@ -204,7 +206,8 @@ Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objec return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message); } -Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[], int64_t num_objects) { +Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[], + int64_t num_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(verify_flatbuffer(message, data, size)); @@ -267,7 +270,8 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object) { return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message); } -Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, bool* has_object) { +Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, + bool* has_object) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(verify_flatbuffer(message, data, size)); @@ -340,8 +344,8 @@ Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects, return PlasmaSend(sock, MessageType_PlasmaGetRequest, &fbb, message); } -Status ReadGetRequest(uint8_t* data, size_t size, - std::vector& object_ids, int64_t* timeout_ms) { +Status ReadGetRequest(uint8_t* data, size_t size, std::vector& object_ids, + int64_t* timeout_ms) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(verify_flatbuffer(message, data, size)); @@ -429,9 +433,8 @@ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_re return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message); } -Status ReadWaitRequest(uint8_t* data, size_t size, - ObjectRequestMap& object_requests, int64_t* timeout_ms, - int* num_ready_objects) { +Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests, + int64_t* timeout_ms, int* num_ready_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(verify_flatbuffer(message, data, size)); @@ -464,8 +467,8 @@ Status SendWaitReply(int sock, const ObjectRequestMap& object_requests, return PlasmaSend(sock, MessageType_PlasmaWaitReply, &fbb, message); } -Status ReadWaitReply(uint8_t* data, size_t size, - ObjectRequest object_requests[], int* num_ready_objects) { +Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[], + int* num_ready_objects) { DCHECK(data); auto message = flatbuffers::GetRoot(data); @@ -497,7 +500,8 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message); } -Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address, int* port) { +Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address, + int* port) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(verify_flatbuffer(message, data, size)); diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h index c0c2fc3d90a..af4b13978c6 100644 --- a/cpp/src/plasma/protocol.h +++ b/cpp/src/plasma/protocol.h @@ -48,13 +48,15 @@ Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id, Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error); -Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, PlasmaObject* object); +Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, + PlasmaObject* object); /* Plasma Seal message functions. */ Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest); -Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, unsigned char* digest); +Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, + unsigned char* digest); Status SendSealReply(int sock, ObjectID object_id, int error); @@ -65,8 +67,8 @@ Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id); Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms); -Status ReadGetRequest(uint8_t* data, size_t size, - std::vector& object_ids, int64_t* timeout_ms); +Status ReadGetRequest(uint8_t* data, size_t size, std::vector& object_ids, + int64_t* timeout_ms); Status SendGetReply( int sock, ObjectID object_ids[], @@ -100,7 +102,8 @@ Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id); Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects); -Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[], int64_t num_objects); +Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[], + int64_t num_objects); Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[], int64_t num_objects); @@ -118,7 +121,8 @@ Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id); Status SendContainsReply(int sock, ObjectID object_id, bool has_object); -Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, bool* has_object); +Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, + bool* has_object); /* Plasma Connect message functions. */ @@ -151,15 +155,14 @@ Status ReadFetchRequest(uint8_t* data, size_t size, std::vector& objec Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests, int num_ready_objects, int64_t timeout_ms); -Status ReadWaitRequest(uint8_t* data, size_t size, - ObjectRequestMap& object_requests, +Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests, int64_t* timeout_ms, int* num_ready_objects); Status SendWaitReply(int sock, const ObjectRequestMap& object_requests, int num_ready_objects); -Status ReadWaitReply(uint8_t* data, size_t size, - ObjectRequest object_requests[], int* num_ready_objects); +Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[], + int* num_ready_objects); /* Plasma Subscribe message functions. */ @@ -169,14 +172,14 @@ Status SendSubscribeRequest(int sock); Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port); -Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address, int* port); +Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address, + int* port); Status SendDataReply(int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size); -Status ReadDataReply(uint8_t* data, size_t size, - ObjectID* object_id, int64_t* object_size, - int64_t* metadata_size); +Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id, + int64_t* object_size, int64_t* metadata_size); } // namespace plasma diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 6bc65079533..34adc6261eb 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -564,7 +564,8 @@ Status PlasmaStore::process_message(Client* client) { case MessageType_PlasmaCreateRequest: { int64_t data_size; int64_t metadata_size; - RETURN_NOT_OK(ReadCreateRequest(input, input_size, &object_id, &data_size, &metadata_size)); + RETURN_NOT_OK( + ReadCreateRequest(input, input_size, &object_id, &data_size, &metadata_size)); int error_code = create_object(object_id, data_size, metadata_size, client, &object); HANDLE_SIGPIPE(SendCreateReply(client->fd, object_id, &object, error_code), @@ -689,9 +690,8 @@ int main(int argc, char* argv[]) { close(shm_fd); if (system_memory > shm_mem_avail) { ARROW_LOG(FATAL) << "System memory request exceeds memory available in /dev/shm. The " - "request is for " - << system_memory << " bytes, and the amount available is " - << shm_mem_avail + "request is for " << system_memory + << " bytes, and the amount available is " << shm_mem_avail << " bytes. You may be able to free up space by deleting files in " "/dev/shm. If you are inside a Docker container, you may need to " "pass " diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc index 65627954b8e..c76f5ce1092 100644 --- a/cpp/src/plasma/test/serialization_tests.cc +++ b/cpp/src/plasma/test/serialization_tests.cc @@ -82,8 +82,8 @@ TEST(PlasmaSerialization, CreateRequest) { ObjectID object_id2; int64_t data_size2; int64_t metadata_size2; - ARROW_CHECK_OK( - ReadCreateRequest(data.data(), data.size(), &object_id2, &data_size2, &metadata_size2)); + ARROW_CHECK_OK(ReadCreateRequest(data.data(), data.size(), &object_id2, &data_size2, + &metadata_size2)); ASSERT_EQ(data_size1, data_size2); ASSERT_EQ(metadata_size1, metadata_size2); ASSERT_EQ(object_id1, object_id2); @@ -142,7 +142,8 @@ TEST(PlasmaSerialization, GetRequest) { std::vector data = read_message_from_file(fd, MessageType_PlasmaGetRequest); std::vector object_ids_return; int64_t timeout_ms_return; - ARROW_CHECK_OK(ReadGetRequest(data.data(), data.size(), object_ids_return, &timeout_ms_return)); + ARROW_CHECK_OK( + ReadGetRequest(data.data(), data.size(), object_ids_return, &timeout_ms_return)); ASSERT_EQ(object_ids[0], object_ids_return[0]); ASSERT_EQ(object_ids[1], object_ids_return[1]); ASSERT_EQ(timeout_ms, timeout_ms_return); @@ -162,8 +163,8 @@ TEST(PlasmaSerialization, GetReply) { ObjectID object_ids_return[2]; PlasmaObject plasma_objects_return[2]; memset(&plasma_objects_return, 0, sizeof(plasma_objects_return)); - ARROW_CHECK_OK( - ReadGetReply(data.data(), data.size(), object_ids_return, &plasma_objects_return[0], 2)); + ARROW_CHECK_OK(ReadGetReply(data.data(), data.size(), object_ids_return, + &plasma_objects_return[0], 2)); ASSERT_EQ(object_ids[0], object_ids_return[0]); ASSERT_EQ(object_ids[1], object_ids_return[1]); ASSERT_EQ(memcmp(&plasma_objects[object_ids[0]], &plasma_objects_return[0], @@ -232,7 +233,8 @@ TEST(PlasmaSerialization, StatusRequest) { ARROW_CHECK_OK(SendStatusRequest(fd, object_ids, num_objects)); std::vector data = read_message_from_file(fd, MessageType_PlasmaStatusRequest); ObjectID object_ids_read[num_objects]; - ARROW_CHECK_OK(ReadStatusRequest(data.data(), data.size(), object_ids_read, num_objects)); + ARROW_CHECK_OK( + ReadStatusRequest(data.data(), data.size(), object_ids_read, num_objects)); ASSERT_EQ(object_ids[0], object_ids_read[0]); ASSERT_EQ(object_ids[1], object_ids_read[1]); close(fd); @@ -249,8 +251,8 @@ TEST(PlasmaSerialization, StatusReply) { int64_t num_objects = ReadStatusReply_num_objects(data.data(), data.size()); ObjectID object_ids_read[num_objects]; int object_statuses_read[num_objects]; - ARROW_CHECK_OK( - ReadStatusReply(data.data(), data.size(), object_ids_read, object_statuses_read, num_objects)); + ARROW_CHECK_OK(ReadStatusReply(data.data(), data.size(), object_ids_read, + object_statuses_read, num_objects)); ASSERT_EQ(object_ids[0], object_ids_read[0]); ASSERT_EQ(object_ids[1], object_ids_read[1]); ASSERT_EQ(object_statuses[0], object_statuses_read[0]); @@ -340,7 +342,8 @@ TEST(PlasmaSerialization, WaitReply) { std::vector data = read_message_from_file(fd, MessageType_PlasmaWaitReply); ObjectRequest objects_out[2]; int num_objects_out; - ARROW_CHECK_OK(ReadWaitReply(data.data(), data.size(), &objects_out[0], &num_objects_out)); + ARROW_CHECK_OK( + ReadWaitReply(data.data(), data.size(), &objects_out[0], &num_objects_out)); ASSERT_EQ(num_objects_in, num_objects_out); for (int i = 0; i < num_objects_out; i++) { /* Each object request must appear exactly once. */ @@ -364,7 +367,8 @@ TEST(PlasmaSerialization, DataRequest) { ObjectID object_id2; char* address2; int port2; - ARROW_CHECK_OK(ReadDataRequest(data.data(), data.size(), &object_id2, &address2, &port2)); + ARROW_CHECK_OK( + ReadDataRequest(data.data(), data.size(), &object_id2, &address2, &port2)); ASSERT_EQ(object_id1, object_id2); ASSERT_EQ(strcmp(address1, address2), 0); ASSERT_EQ(port1, port2); @@ -383,7 +387,8 @@ TEST(PlasmaSerialization, DataReply) { ObjectID object_id2; int64_t object_size2; int64_t metadata_size2; - ARROW_CHECK_OK(ReadDataReply(data.data(), data.size(), &object_id2, &object_size2, &metadata_size2)); + ARROW_CHECK_OK(ReadDataReply(data.data(), data.size(), &object_id2, &object_size2, + &metadata_size2)); ASSERT_EQ(object_id1, object_id2); ASSERT_EQ(object_size1, object_size2); ASSERT_EQ(metadata_size1, metadata_size2);