Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions python/pyarrow/plasma.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -206,28 +206,11 @@ cdef class PlasmaClient:
c_string store_socket_name
c_string manager_socket_name

def __cinit__(self, store_socket_name, manager_socket_name, int release_delay):
"""
Create a new PlasmaClient that is connected to a plasma store
and optionally a plasma manager.

Parameters
----------
store_socket_name : str
Name of the socket the plasma store is listening at.
manager_socket_name : str
Name of the socket the plasma manager is listening at.
release_delay : int
The maximum number of objects that the client will keep and
delay releasing (for caching reasons).
"""
def __cinit__(self):
self.client.reset(new CPlasmaClient())
self.notification_fd = -1
self.store_socket_name = store_socket_name.encode()
self.manager_socket_name = manager_socket_name.encode()
with nogil:
check_status(self.client.get().Connect(self.store_socket_name,
self.manager_socket_name, release_delay))
self.store_socket_name = ""
self.manager_socket_name = ""

cdef _get_object_buffers(self, object_ids, int64_t timeout_ms,
c_vector[CObjectBuffer]* result):
Expand Down Expand Up @@ -558,3 +541,26 @@ cdef class PlasmaClient:
"""
with nogil:
check_status(self.client.get().Disconnect())

def connect(store_socket_name, manager_socket_name, int release_delay):
"""
Return a new PlasmaClient that is connected a plasma store and
optionally a manager.

Parameters
----------
store_socket_name : str
Name of the socket the plasma store is listening at.
manager_socket_name : str
Name of the socket the plasma manager is listening at.
release_delay : int
The maximum number of objects that the client will keep and
delay releasing (for caching reasons).
"""
cdef PlasmaClient result = PlasmaClient()
result.store_socket_name = store_socket_name.encode()
result.manager_socket_name = manager_socket_name.encode()
with nogil:
check_status(result.client.get().Connect(result.store_socket_name,
result.manager_socket_name, release_delay))
return result
8 changes: 4 additions & 4 deletions python/pyarrow/tests/test_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ def setup_method(self, test_method):
plasma_store_name, self.p = start_plasma_store(
use_valgrind=os.getenv("PLASMA_VALGRIND") == "1")
# Connect to Plasma.
self.plasma_client = plasma.PlasmaClient(plasma_store_name, "", 64)
self.plasma_client = plasma.connect(plasma_store_name, "", 64)
# For the eviction test
self.plasma_client2 = plasma.PlasmaClient(plasma_store_name, "", 0)
self.plasma_client2 = plasma.connect(plasma_store_name, "", 0)

def teardown_method(self, test_method):
# Check that the Plasma store is still alive.
Expand Down Expand Up @@ -598,7 +598,7 @@ def test_evict(self):
def test_subscribe(self):
# Subscribe to notifications from the Plasma Store.
self.plasma_client.subscribe()
for i in [1, 10, 100, 1000, 10000, 100000]:
for i in [1, 10, 100, 1000, 10000]:
object_ids = [random_object_id() for _ in range(i)]
metadata_sizes = [np.random.randint(1000) for _ in range(i)]
data_sizes = [np.random.randint(1000) for _ in range(i)]
Expand All @@ -620,7 +620,7 @@ def test_subscribe_deletions(self):
# plasma_client2 to make sure that all used objects will get evicted
# properly.
self.plasma_client2.subscribe()
for i in [1, 10, 100, 1000, 10000, 100000]:
for i in [1, 10, 100, 1000, 10000]:
object_ids = [random_object_id() for _ in range(i)]
# Add 1 to the sizes to make sure we have nonzero object sizes.
metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
Expand Down