Skip to content

Commit d9261b4

Browse files
committed
add documentation and license
1 parent db2d09a commit d9261b4

File tree

4 files changed

+168
-0
lines changed

4 files changed

+168
-0
lines changed

cpp/src/plasma/plasma.pyx

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
118
# cython: profile=False
219
# distutils: language = c++
320
# cython: embedsignature = True
@@ -86,19 +103,42 @@ cdef class ObjectID:
86103
return "ObjectID(" + self.data.hex().decode() + ")"
87104

88105
cdef class PlasmaBuffer(Buffer):
106+
"""This is the type of objects returned by calls to get with a PlasmaClient.
107+
108+
We define our own class instead of directly returning a buffer object so that
109+
we can add a custom destructor which notifies Plasma that the object is no
110+
longer being used, so the memory in the Plasma store backing the object can
111+
potentially be freed.
112+
113+
Attributes:
114+
object_id (ObjectID): The ID of the object in the buffer.
115+
client (PlasmaClient): The PlasmaClient that we use to communicate
116+
with the store and manager.
117+
"""
89118

90119
cdef:
91120
ObjectID object_id
92121
PlasmaClient client
93122

94123
def __cinit__(self, ObjectID object_id, PlasmaClient client):
124+
"""Initialize a PlasmaBuffer."""
95125
self.object_id = object_id
96126
self.client = client
97127

98128
def __dealloc__(self):
129+
"""Notify Plasma that the object is no longer needed.
130+
131+
If the plasma client has been shut down, then don't do anything.
132+
"""
99133
self.client.release(self.object_id)
100134

101135
cdef class PlasmaClient:
136+
"""The PlasmaClient is used to interface with a plasma store and manager.
137+
138+
The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a
139+
buffer, and get a buffer. Buffers are referred to by object IDs, which are
140+
strings.
141+
"""
102142

103143
cdef:
104144
shared_ptr[CPlasmaClient] client
@@ -124,14 +164,57 @@ cdef class PlasmaClient:
124164
return result
125165

126166
def connect(self, store_socket_name, manager_socket_name, release_delay):
167+
"""Conect the PlasmaClient to a plasma store and optionally a manager.
168+
169+
Args:
170+
store_socket_name (str): Name of the socket the plasma store is listening
171+
at.
172+
manager_socket_name (str): Name of the socket the plasma manager is
173+
listening at.
174+
release_delay (int): The maximum number of objects that the client will
175+
keep and delay releasing (for caching reasons).
176+
"""
127177
check_status(self.client.get().Connect(store_socket_name.encode(), manager_socket_name.encode(), release_delay))
128178

129179
def create(self, ObjectID object_id, data_size, c_string metadata=b""):
180+
"""Create a new buffer in the PlasmaStore for a particular object ID.
181+
182+
The returned buffer is mutable until seal is called.
183+
184+
Args:
185+
object_id (ObjectID): The object ID used to identify an object.
186+
size (int): The size in bytes of the created buffer.
187+
metadata (bytes): An optional string of bytes encoding whatever metadata
188+
the user wishes to encode.
189+
190+
Raises:
191+
PlasmaObjectExists: This exception is raised if the object could
192+
not be created because there already is an object with the same ID in
193+
the plasma store.
194+
PlasmaStoreFull: This exception is raised if the object could
195+
not be created because the plasma store is unable to evict enough
196+
objects to create room for it.
197+
"""
130198
cdef uint8_t* data
131199
check_status(self.client.get().Create(object_id.data, data_size, <uint8_t*>(metadata.data()), metadata.size(), &data))
132200
return self._make_plasma_buffer(object_id, data, data_size)
133201

134202
def get(self, object_ids, timeout_ms=-1):
203+
"""Returns data buffer from the PlasmaStore based on object ID.
204+
205+
If the object has not been sealed yet, this call will block. The retrieved
206+
buffer is immutable.
207+
208+
Args:
209+
object_ids (List[ObjectID]): A list of ObjectIDs used to identify some objects.
210+
timeout_ms (int): The number of milliseconds that the get call should
211+
block before timing out and returning. Pass -1 if the call should block
212+
and 0 if the call should return immediately.
213+
214+
Returns:
215+
List of PlasmaBuffers for the data associated with the object_ids and None
216+
if the object was not available.
217+
"""
135218
cdef c_vector[CObjectBuffer] object_buffers
136219
self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
137220
result = []
@@ -143,6 +226,21 @@ cdef class PlasmaClient:
143226
return result
144227

145228
def get_metadata(self, object_ids, timeout_ms=-1):
229+
"""Returns metadata buffer from the PlasmaStore based on object ID.
230+
231+
If the object has not been sealed yet, this call will block. The retrieved
232+
buffer is immutable.
233+
234+
Args:
235+
object_ids (List[ObjectID]): A list of ObjectIDs used to identify some objects.
236+
timeout_ms (int): The number of milliseconds that the get call should
237+
block before timing out and returning. Pass -1 if the call should block
238+
and 0 if the call should return immediately.
239+
240+
Returns:
241+
List of PlasmaBuffers for the metadata associated with the object_ids and None
242+
if the object was not available.
243+
"""
146244
cdef c_vector[CObjectBuffer] object_buffers
147245
self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
148246
result = []
@@ -151,30 +249,62 @@ cdef class PlasmaClient:
151249
return result
152250

153251
def seal(self, ObjectID object_id):
252+
"""Seal the buffer in the PlasmaStore for a particular object ID.
253+
254+
Once a buffer has been sealed, the buffer is immutable and can only be
255+
accessed through get.
256+
257+
Args:
258+
object_id (str): A string used to identify an object.
259+
"""
154260
check_status(self.client.get().Seal(object_id.data))
155261

156262
def release(self, ObjectID object_id):
263+
"""Notify Plasma that the object is no longer needed."""
157264
check_status(self.client.get().Release(object_id.data))
158265

159266
def contains(self, ObjectID object_id):
267+
"""Check if the object is present and has been sealed in the PlasmaStore.
268+
269+
Args:
270+
object_id (str): A string used to identify an object.
271+
"""
160272
cdef c_bool is_contained
161273
check_status(self.client.get().Contains(object_id.data, &is_contained))
162274
return is_contained
163275

164276
def hash(self, ObjectID object_id):
277+
"""Compute the checksum of an object in the object store.
278+
279+
Args:
280+
object_id (str): A string used to identify an object.
281+
282+
Returns:
283+
A digest string object's hash. If the object isn't in the object
284+
store, the string will have length zero.
285+
"""
165286
cdef c_vector[uint8_t] digest = c_vector[uint8_t](kDigestSize)
166287
check_status(self.client.get().Hash(object_id.data, digest.data()))
167288
return bytes(digest[:])
168289

169290
def evict(self, int64_t num_bytes):
291+
"""Evict some objects until to recover some bytes.
292+
293+
Recover at least num_bytes bytes if possible.
294+
295+
Args:
296+
num_bytes (int): The number of bytes to attempt to recover.
297+
"""
170298
cdef int64_t num_bytes_evicted
171299
check_status(self.client.get().Evict(num_bytes, num_bytes_evicted))
172300
return num_bytes_evicted
173301

174302
def subscribe(self):
303+
"""Subscribe to notifications about sealed objects."""
175304
check_status(self.client.get().Subscribe(&self.notification_fd))
176305

177306
def get_next_notification(self):
307+
"""Get the next notification from the notification socket."""
178308
cdef ObjectID object_id = ObjectID(20 * b"\0")
179309
cdef int64_t data_size
180310
cdef int64_t metadata_size

cpp/src/plasma/test/test.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
118
from __future__ import absolute_import
219
from __future__ import division
320
from __future__ import print_function

python/pyarrow/error.pxi

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,18 @@ class ArrowNotImplementedError(NotImplementedError, ArrowException):
4848
pass
4949

5050

51+
class PlasmaObjectExists(ArrowException):
52+
pass
53+
54+
55+
class PlasmaObjectNonexistent(ArrowException):
56+
pass
57+
58+
59+
class PlasmaStoreFull(ArrowException):
60+
pass
61+
62+
5163
cdef int check_status(const CStatus& status) nogil except -1:
5264
if status.ok():
5365
return 0
@@ -66,5 +78,11 @@ cdef int check_status(const CStatus& status) nogil except -1:
6678
raise ArrowNotImplementedError(message)
6779
elif status.IsTypeError():
6880
raise ArrowTypeError(message)
81+
elif status.IsPlasmaObjectExists():
82+
raise PlasmaObjectExists(message)
83+
elif status.IsPlasmaObjectNonexistent():
84+
raise PlasmaObjectNonexistent(message)
85+
elif status.IsPlasmaStoreFull():
86+
raise PlasmaStoreFull(message)
6987
else:
7088
raise ArrowException(message)

python/pyarrow/includes/common.pxd

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
5050
c_bool IsKeyError()
5151
c_bool IsNotImplemented()
5252
c_bool IsTypeError()
53+
c_bool IsPlasmaObjectExists()
54+
c_bool IsPlasmaObjectNonexistent()
55+
c_bool IsPlasmaStoreFull()
5356

5457

5558
cdef inline object PyObject_to_object(PyObject* o):

0 commit comments

Comments
 (0)