diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index bb17685277a..8aaca9963c1 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -206,28 +206,11 @@ cdef class PlasmaClient: c_string store_socket_name c_string manager_socket_name - def __cinit__(self, store_socket_name, manager_socket_name, int release_delay): - """ - Create a new PlasmaClient that is connected to a plasma store - and optionally a plasma manager. - - Parameters - ---------- - store_socket_name : str - Name of the socket the plasma store is listening at. - manager_socket_name : str - Name of the socket the plasma manager is listening at. - release_delay : int - The maximum number of objects that the client will keep and - delay releasing (for caching reasons). - """ + def __cinit__(self): self.client.reset(new CPlasmaClient()) self.notification_fd = -1 - self.store_socket_name = store_socket_name.encode() - self.manager_socket_name = manager_socket_name.encode() - with nogil: - check_status(self.client.get().Connect(self.store_socket_name, - self.manager_socket_name, release_delay)) + self.store_socket_name = "" + self.manager_socket_name = "" cdef _get_object_buffers(self, object_ids, int64_t timeout_ms, c_vector[CObjectBuffer]* result): @@ -558,3 +541,26 @@ cdef class PlasmaClient: """ with nogil: check_status(self.client.get().Disconnect()) + +def connect(store_socket_name, manager_socket_name, int release_delay): + """ + Return a new PlasmaClient that is connected a plasma store and + optionally a manager. + + Parameters + ---------- + store_socket_name : str + Name of the socket the plasma store is listening at. + manager_socket_name : str + Name of the socket the plasma manager is listening at. + release_delay : int + The maximum number of objects that the client will keep and + delay releasing (for caching reasons). + """ + cdef PlasmaClient result = PlasmaClient() + result.store_socket_name = store_socket_name.encode() + result.manager_socket_name = manager_socket_name.encode() + with nogil: + check_status(result.client.get().Connect(result.store_socket_name, + result.manager_socket_name, release_delay)) + return result diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index ce684e3e41f..8f8d5b5ed60 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -154,9 +154,9 @@ def setup_method(self, test_method): plasma_store_name, self.p = start_plasma_store( use_valgrind=os.getenv("PLASMA_VALGRIND") == "1") # Connect to Plasma. - self.plasma_client = plasma.PlasmaClient(plasma_store_name, "", 64) + self.plasma_client = plasma.connect(plasma_store_name, "", 64) # For the eviction test - self.plasma_client2 = plasma.PlasmaClient(plasma_store_name, "", 0) + self.plasma_client2 = plasma.connect(plasma_store_name, "", 0) def teardown_method(self, test_method): # Check that the Plasma store is still alive. @@ -598,7 +598,7 @@ def test_evict(self): def test_subscribe(self): # Subscribe to notifications from the Plasma Store. self.plasma_client.subscribe() - for i in [1, 10, 100, 1000, 10000, 100000]: + for i in [1, 10, 100, 1000, 10000]: object_ids = [random_object_id() for _ in range(i)] metadata_sizes = [np.random.randint(1000) for _ in range(i)] data_sizes = [np.random.randint(1000) for _ in range(i)] @@ -620,7 +620,7 @@ def test_subscribe_deletions(self): # plasma_client2 to make sure that all used objects will get evicted # properly. self.plasma_client2.subscribe() - for i in [1, 10, 100, 1000, 10000, 100000]: + for i in [1, 10, 100, 1000, 10000]: object_ids = [random_object_id() for _ in range(i)] # Add 1 to the sizes to make sure we have nonzero object sizes. metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)]