Skip to content

Commit db2d09a

Browse files
committed
get all python tests in place
1 parent 78d08ac commit db2d09a

File tree

4 files changed

+393
-29
lines changed

4 files changed

+393
-29
lines changed

cpp/src/plasma/client.cc

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -388,22 +388,6 @@ static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) {
388388
return XXH64_digest(&hash_state);
389389
}
390390

391-
bool plasma_compute_object_hash(
392-
PlasmaClient* conn, ObjectID object_id, unsigned char* digest) {
393-
// Get the plasma object data. We pass in a timeout of 0 to indicate that
394-
// the operation should timeout immediately.
395-
ObjectBuffer object_buffer;
396-
ARROW_CHECK_OK(conn->Get(&object_id, 1, 0, &object_buffer));
397-
// If the object was not retrieved, return false.
398-
if (object_buffer.data_size == -1) { return false; }
399-
// Compute the hash.
400-
uint64_t hash = compute_object_hash(object_buffer);
401-
memcpy(digest, &hash, sizeof(hash));
402-
// Release the plasma object.
403-
ARROW_CHECK_OK(conn->Release(object_id));
404-
return true;
405-
}
406-
407391
Status PlasmaClient::Seal(const ObjectID& object_id) {
408392
// Make sure this client has a reference to the object before sending the
409393
// request to Plasma.
@@ -415,7 +399,7 @@ Status PlasmaClient::Seal(const ObjectID& object_id) {
415399
object_entry->second->is_sealed = true;
416400
/// Send the seal request to Plasma.
417401
static unsigned char digest[kDigestSize];
418-
ARROW_CHECK(plasma_compute_object_hash(this, object_id, &digest[0]));
402+
RETURN_NOT_OK(Hash(object_id, &digest[0]));
419403
RETURN_NOT_OK(SendSealRequest(store_conn_, object_id, &digest[0]));
420404
// We call PlasmaClient::Release to decrement the number of instances of this
421405
// object
@@ -441,6 +425,22 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
441425
return ReadEvictReply(buffer.data(), num_bytes_evicted);
442426
}
443427

428+
Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) {
429+
// Get the plasma object data. We pass in a timeout of 0 to indicate that
430+
// the operation should timeout immediately.
431+
ObjectBuffer object_buffer;
432+
RETURN_NOT_OK(Get(&object_id, 1, 0, &object_buffer));
433+
// If the object was not retrieved, return false.
434+
if (object_buffer.data_size == -1) {
435+
return Status::PlasmaObjectNonexistent("Object not found");
436+
}
437+
// Compute the hash.
438+
uint64_t hash = compute_object_hash(object_buffer);
439+
memcpy(digest, &hash, sizeof(hash));
440+
// Release the plasma object.
441+
return Release(object_id);
442+
}
443+
444444
Status PlasmaClient::Subscribe(int* fd) {
445445
int sock[2];
446446
// Create a non-blocking socket pair. This will only be used to send
@@ -461,6 +461,25 @@ Status PlasmaClient::Subscribe(int* fd) {
461461
return Status::OK();
462462
}
463463

464+
Status PlasmaClient::GetNotification(int fd, ObjectID *object_id, int64_t* data_size, int64_t* metadata_size) {
465+
uint8_t* notification = read_message_async(fd);
466+
if (notification == NULL) {
467+
return Status::IOError("Failed to read object notification from Plasma socket");
468+
}
469+
auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
470+
ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
471+
memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
472+
if (object_info->is_deletion()) {
473+
*data_size = -1;
474+
*metadata_size = -1;
475+
} else {
476+
*data_size = object_info->data_size();
477+
*metadata_size = object_info->metadata_size();
478+
}
479+
delete[] notification;
480+
return Status::OK();
481+
}
482+
464483
Status PlasmaClient::Connect(const std::string& store_socket_name,
465484
const std::string& manager_socket_name, int release_delay) {
466485
store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1);

cpp/src/plasma/client.h

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -179,18 +179,35 @@ class ARROW_EXPORT PlasmaClient {
179179
/// retrieved.
180180
/// @return The return status.
181181
Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted);
182+
183+
/// Compute the hash of an object in the object store.
184+
///
185+
/// @param conn The object containing the connection state.
186+
/// @param object_id The ID of the object we want to hash.
187+
/// @param digest A pointer at which to return the hash digest of the object.
188+
/// The pointer must have at least kDigestSize bytes allocated.
189+
/// @return The return status.
190+
Status Hash(const ObjectID& object_id, uint8_t* digest);
182191

183192
/// Subscribe to notifications when objects are sealed in the object store.
184193
/// Whenever an object is sealed, a message will be written to the client
185-
/// socket
186-
/// that is returned by this method.
194+
/// socket that is returned by this method.
187195
///
188196
/// @param fd Out parameter for the file descriptor the client should use to
189197
/// read notifications
190198
/// from the object store about sealed objects.
191199
/// @return The return status.
192200
Status Subscribe(int* fd);
193201

202+
/// Receive next object notification for this client if Subscribe has been called.
203+
///
204+
/// @param fd The file descriptor we are reading the notification from.
205+
/// @param object_id Out parameter, the object_id of the object that was sealed.
206+
/// @param data_size Out parameter, the data size of the object that was sealed.
207+
/// @param metadata_size Out parameter, the metadata size of the object that was sealed.
208+
/// @return The return status.
209+
Status GetNotification(int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size);
210+
194211
/// Disconnect from the local plasma instance, including the local store and
195212
/// manager.
196213
///
@@ -333,14 +350,4 @@ class ARROW_EXPORT PlasmaClient {
333350
int64_t store_capacity_;
334351
};
335352

336-
/// Compute the hash of an object in the object store.
337-
///
338-
/// @param conn The object containing the connection state.
339-
/// @param object_id The ID of the object we want to hash.
340-
/// @param digest A pointer at which to return the hash digest of the object.
341-
/// The pointer must have at least DIGEST_SIZE bytes allocated.
342-
/// @return A boolean representing whether the hash operation succeeded.
343-
bool plasma_compute_object_hash(
344-
PlasmaClient* conn, ObjectID object_id, unsigned char* digest);
345-
346353
#endif // PLASMA_CLIENT_H

cpp/src/plasma/plasma.pyx

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,13 @@ cdef extern from "plasma/common.h" nogil:
2626
@staticmethod
2727
CUniqueID from_binary(const c_string& binary)
2828

29+
c_bool operator==(const CUniqueID& rhs) const
30+
2931
c_string hex()
3032

33+
cdef extern from "plasma/plasma.h":
34+
cdef int64_t kDigestSize
35+
3136
cdef extern from "plasma/client.h" nogil:
3237

3338
cdef cppclass CPlasmaClient" PlasmaClient":
@@ -43,8 +48,18 @@ cdef extern from "plasma/client.h" nogil:
4348

4449
CStatus Seal(const CUniqueID& object_id)
4550

51+
CStatus Evict(int64_t num_bytes, int64_t& num_bytes_evicted)
52+
53+
CStatus Hash(const CUniqueID& object_id, uint8_t* digest)
54+
4655
CStatus Release(const CUniqueID& object_id)
4756

57+
CStatus Contains(const CUniqueID& object_id, c_bool* has_object)
58+
59+
CStatus Subscribe(int* fd)
60+
61+
CStatus GetNotification(int fd, CUniqueID* object_id, int64_t* data_size, int64_t* metadata_size)
62+
4863
CStatus Disconnect()
4964

5065
cdef extern from "plasma/client.h" nogil:
@@ -63,6 +78,10 @@ cdef class ObjectID:
6378
def __cinit__(self, object_id):
6479
self.data = CUniqueID.from_binary(object_id)
6580

81+
def __richcmp__(ObjectID self, ObjectID object_id, operation):
82+
assert operation == 2, "only equality implemented so far"
83+
return self.data == object_id.data
84+
6685
def __repr__(self):
6786
return "ObjectID(" + self.data.hex().decode() + ")"
6887

@@ -83,9 +102,11 @@ cdef class PlasmaClient:
83102

84103
cdef:
85104
shared_ptr[CPlasmaClient] client
105+
int notification_fd
86106

87107
def __cinit__(self):
88108
self.client.reset(new CPlasmaClient())
109+
self.notification_fd = -1
89110

90111
cdef _get_object_buffers(self, object_ids, int64_t timeout_ms, c_vector[CObjectBuffer]* result):
91112
cdef c_vector[CUniqueID] ids
@@ -135,5 +156,30 @@ cdef class PlasmaClient:
135156
def release(self, ObjectID object_id):
136157
check_status(self.client.get().Release(object_id.data))
137158

159+
def contains(self, ObjectID object_id):
160+
cdef c_bool is_contained
161+
check_status(self.client.get().Contains(object_id.data, &is_contained))
162+
return is_contained
163+
164+
def hash(self, ObjectID object_id):
165+
cdef c_vector[uint8_t] digest = c_vector[uint8_t](kDigestSize)
166+
check_status(self.client.get().Hash(object_id.data, digest.data()))
167+
return bytes(digest[:])
168+
169+
def evict(self, int64_t num_bytes):
170+
cdef int64_t num_bytes_evicted
171+
check_status(self.client.get().Evict(num_bytes, num_bytes_evicted))
172+
return num_bytes_evicted
173+
174+
def subscribe(self):
175+
check_status(self.client.get().Subscribe(&self.notification_fd))
176+
177+
def get_next_notification(self):
178+
cdef ObjectID object_id = ObjectID(20 * b"\0")
179+
cdef int64_t data_size
180+
cdef int64_t metadata_size
181+
check_status(self.client.get().GetNotification(self.notification_fd, &object_id.data, &data_size, &metadata_size))
182+
return object_id, data_size, metadata_size
183+
138184
def disconnect(self):
139185
check_status(self.client.get().Disconnect())

0 commit comments

Comments
 (0)