Skip to content

Commit bc681ca

Browse files
committed
port some python tests
1 parent f8e05f2 commit bc681ca

File tree

2 files changed

+204
-31
lines changed

2 files changed

+204
-31
lines changed

cpp/src/plasma/plasma.pyx

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,7 @@ from libcpp.vector cimport vector as c_vector
99
from libc.stdint cimport int64_t, uint8_t, uintptr_t
1010

1111
from pyarrow.lib cimport Buffer, NativeFile, check_status
12-
from pyarrow.includes.libarrow cimport MutableBuffer, CBuffer, CFixedSizeBufferWrite
13-
14-
cdef extern from "arrow/api.h" namespace "arrow" nogil:
15-
# We can later add more of the common status factory methods as needed
16-
cdef CStatus CStatus_OK "Status::OK"()
17-
18-
cdef cppclass CStatus "arrow::Status":
19-
CStatus()
20-
21-
c_string ToString()
22-
23-
c_bool ok()
24-
c_bool IsIOError()
25-
c_bool IsOutOfMemory()
26-
c_bool IsInvalid()
27-
c_bool IsKeyError()
28-
c_bool IsNotImplemented()
29-
c_bool IsTypeError()
30-
31-
12+
from pyarrow.includes.libarrow cimport MutableBuffer, CBuffer, CFixedSizeBufferWrite, CStatus
3213

3314
cdef class FixedSizeBufferOutputStream(NativeFile):
3415

@@ -55,7 +36,7 @@ cdef extern from "plasma/client.h" nogil:
5536

5637
CStatus Connect(const c_string& store_socket_name, const c_string& manager_socket_name, int release_delay)
5738

58-
CStatus Create(const CUniqueID& object_id, int64_t data_size, uint8_t* metadata,
39+
CStatus Create(const CUniqueID& object_id, int64_t data_size, const uint8_t* metadata,
5940
int64_t metadata_size, uint8_t** data)
6041

6142
CStatus Get(const CUniqueID* object_ids, int64_t num_objects, int64_t timeout_ms, CObjectBuffer* object_buffers)
@@ -91,6 +72,14 @@ cdef class PlasmaClient:
9172
def __cinit__(self):
9273
self.client.reset(new CPlasmaClient())
9374

75+
cdef _get_object_buffers(self, object_ids, c_vector[CObjectBuffer]* result):
76+
cdef c_vector[CUniqueID] ids
77+
cdef ObjectID object_id
78+
for object_id in object_ids:
79+
ids.push_back(object_id.data)
80+
result[0].resize(ids.size())
81+
check_status(self.client.get().Get(ids.data(), ids.size(), 0, result[0].data()))
82+
9483
cdef _make_buffer(self, uint8_t* data, int64_t size):
9584
cdef shared_ptr[MutableBuffer] buffer
9685
buffer.reset(new MutableBuffer(data, size))
@@ -99,21 +88,22 @@ cdef class PlasmaClient:
9988
return result
10089

10190
def connect(self, store_socket_name, manager_socket_name, release_delay):
102-
check_status(self.client.get().Connect(store_socket_name, manager_socket_name, release_delay))
91+
check_status(self.client.get().Connect(store_socket_name.encode(), manager_socket_name.encode(), release_delay))
10392

104-
def create(self, ObjectID object_id, data_size):
93+
def create(self, ObjectID object_id, data_size, c_string metadata=b""):
10594
cdef uint8_t* data
106-
check_status(self.client.get().Create(object_id.data, data_size, NULL, 0, &data))
95+
check_status(self.client.get().Create(object_id.data, data_size, <uint8_t*>(metadata.data()), metadata.size(), &data))
10796
return self._make_buffer(data, data_size)
10897

10998
def get(self, object_ids):
110-
cdef c_vector[CUniqueID] ids
111-
cdef ObjectID object_id
112-
for object_id in object_ids:
113-
ids.push_back(object_id.data)
114-
cdef c_vector[CObjectBuffer] result = c_vector[CObjectBuffer](ids.size())
115-
check_status(self.client.get().Get(ids.data(), ids.size(), 0, result.data()))
116-
return [self._make_buffer(r.data, r.data_size) for r in result]
99+
cdef c_vector[CObjectBuffer] object_buffers
100+
self._get_object_buffers(object_ids, &object_buffers)
101+
return [self._make_buffer(b.data, b.data_size) for b in object_buffers]
102+
103+
def get_metadata(self, object_ids):
104+
cdef c_vector[CObjectBuffer] object_buffers
105+
self._get_object_buffers(object_ids, &object_buffers)
106+
return [self._make_buffer(b.metadata, b.metadata_size) for b in object_buffers]
117107

118108
def seal(self, ObjectID object_id):
119109
check_status(self.client.get().Seal(object_id.data))

cpp/src/plasma/test/test.py

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
from __future__ import absolute_import
2+
from __future__ import division
3+
from __future__ import print_function
4+
5+
import numpy as np
6+
import os
7+
import random
8+
import signal
9+
import subprocess
10+
import sys
11+
import threading
12+
import time
13+
import unittest
14+
15+
import plasma
16+
import pyarrow as pa
17+
18+
DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9
19+
20+
USE_VALGRIND = False
21+
22+
def random_name():
23+
return str(random.randint(0, 99999999))
24+
25+
def random_object_id():
26+
return plasma.ObjectID(np.random.bytes(20))
27+
28+
def generate_metadata(length):
29+
metadata_buffer = bytearray(length)
30+
if length > 0:
31+
metadata_buffer[0] = random.randint(0, 255)
32+
metadata_buffer[-1] = random.randint(0, 255)
33+
for _ in range(100):
34+
metadata_buffer[random.randint(0, length - 1)] = random.randint(0, 255)
35+
return metadata_buffer
36+
37+
def assert_get_object_equal(unit_test, client1, client2, object_id,
38+
memory_buffer=None, metadata=None):
39+
client1_buff = client1.get([object_id])[0]
40+
client2_buff = client2.get([object_id])[0]
41+
client1_metadata = client1.get_metadata([object_id])[0]
42+
client2_metadata = client2.get_metadata([object_id])[0]
43+
unit_test.assertEqual(len(client1_buff), len(client2_buff))
44+
unit_test.assertEqual(len(client1_metadata), len(client2_metadata))
45+
# Check that the buffers from the two clients are the same.
46+
unit_test.assertTrue(plasma.buffers_equal(client1_buff, client2_buff))
47+
# Check that the metadata buffers from the two clients are the same.
48+
unit_test.assertTrue(plasma.buffers_equal(client1_metadata,
49+
client2_metadata))
50+
# If a reference buffer was provided, check that it is the same as well.
51+
if memory_buffer is not None:
52+
unit_test.assertTrue(plasma.buffers_equal(memory_buffer, client1_buff))
53+
# If reference metadata was provided, check that it is the same as well.
54+
if metadata is not None:
55+
unit_test.assertTrue(plasma.buffers_equal(metadata, client1_metadata))
56+
57+
def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
58+
use_valgrind=False, use_profiler=False,
59+
stdout_file=None, stderr_file=None):
60+
"""Start a plasma store process.
61+
Args:
62+
use_valgrind (bool): True if the plasma store should be started inside of
63+
valgrind. If this is True, use_profiler must be False.
64+
use_profiler (bool): True if the plasma store should be started inside a
65+
profiler. If this is True, use_valgrind must be False.
66+
stdout_file: A file handle opened for writing to redirect stdout to. If no
67+
redirection should happen, then this should be None.
68+
stderr_file: A file handle opened for writing to redirect stderr to. If no
69+
redirection should happen, then this should be None.
70+
Return:
71+
A tuple of the name of the plasma store socket and the process ID of the
72+
plasma store process.
73+
"""
74+
if use_valgrind and use_profiler:
75+
raise Exception("Cannot use valgrind and profiler at the same time.")
76+
plasma_store_executable = os.path.join(os.path.abspath(
77+
os.path.dirname(__file__)),
78+
"../../../build/debug/plasma_store")
79+
plasma_store_name = "/tmp/plasma_store{}".format(random_name())
80+
command = [plasma_store_executable,
81+
"-s", plasma_store_name,
82+
"-m", str(plasma_store_memory)]
83+
if use_valgrind:
84+
pid = subprocess.Popen(["valgrind",
85+
"--track-origins=yes",
86+
"--leak-check=full",
87+
"--show-leak-kinds=all",
88+
"--error-exitcode=1"] + command,
89+
stdout=stdout_file, stderr=stderr_file)
90+
time.sleep(1.0)
91+
elif use_profiler:
92+
pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command,
93+
stdout=stdout_file, stderr=stderr_file)
94+
time.sleep(1.0)
95+
else:
96+
pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
97+
time.sleep(0.1)
98+
return plasma_store_name, pid
99+
100+
class TestPlasmaClient(unittest.TestCase):
101+
102+
def setUp(self):
103+
# Start Plasma store.
104+
plasma_store_name, self.p = start_plasma_store(
105+
use_valgrind=USE_VALGRIND)
106+
# Connect to Plasma.
107+
self.plasma_client = plasma.PlasmaClient()
108+
self.plasma_client.connect(plasma_store_name, "", 64)
109+
# For the eviction test
110+
self.plasma_client2 = plasma.PlasmaClient()
111+
self.plasma_client2.connect(plasma_store_name, "", 0)
112+
113+
def tearDown(self):
114+
# Check that the Plasma store is still alive.
115+
self.assertEqual(self.p.poll(), None)
116+
# Kill the plasma store process.
117+
if USE_VALGRIND:
118+
self.p.send_signal(signal.SIGTERM)
119+
self.p.wait()
120+
if self.p.returncode != 0:
121+
os._exit(-1)
122+
else:
123+
self.p.kill()
124+
125+
def test_create(self):
126+
# Create an object id string.
127+
object_id = random_object_id()
128+
# Create a new buffer and write to it.
129+
length = 50
130+
memory_buffer = np.frombuffer(self.plasma_client.create(object_id, length), dtype="uint8")
131+
for i in range(length):
132+
memory_buffer[i] = i % 256
133+
# Seal the object.
134+
self.plasma_client.seal(object_id)
135+
# Get the object.
136+
memory_buffer = np.frombuffer(self.plasma_client.get([object_id])[0], dtype="uint8")
137+
for i in range(length):
138+
self.assertEqual(memory_buffer[i], i % 256)
139+
140+
def test_create_with_metadata(self):
141+
for length in range(1000):
142+
# Create an object id string.
143+
object_id = random_object_id()
144+
# Create a random metadata string.
145+
metadata = generate_metadata(length)
146+
# Create a new buffer and write to it.
147+
memory_buffer = np.frombuffer(self.plasma_client.create(object_id, length, metadata), dtype="uint8")
148+
for i in range(length):
149+
memory_buffer[i] = i % 256
150+
# Seal the object.
151+
self.plasma_client.seal(object_id)
152+
# Get the object.
153+
memory_buffer = np.frombuffer(self.plasma_client.get([object_id])[0], dtype="uint8")
154+
for i in range(length):
155+
self.assertEqual(memory_buffer[i], i % 256)
156+
# Get the metadata.
157+
metadata_buffer = np.frombuffer(self.plasma_client.get_metadata([object_id])[0], dtype="uint8")
158+
self.assertEqual(len(metadata), len(metadata_buffer))
159+
for i in range(len(metadata)):
160+
self.assertEqual(metadata[i], metadata_buffer[i])
161+
162+
def test_create_existing(self):
163+
# This test is partially used to test the code path in which we create an
164+
# object with an ID that already exists
165+
length = 100
166+
for _ in range(1000):
167+
object_id = random_object_id()
168+
self.plasma_client.create(object_id, length, generate_metadata(length))
169+
try:
170+
self.plasma_client.create(object_id, length, generate_metadata(length))
171+
except pa.lib.ArrowException as e:
172+
pass
173+
else:
174+
self.assertTrue(False)
175+
176+
if __name__ == "__main__":
177+
if len(sys.argv) > 1:
178+
# Pop the argument so we don't mess with unittest's own argument parser.
179+
if sys.argv[-1] == "valgrind":
180+
arg = sys.argv.pop()
181+
USE_VALGRIND = True
182+
print("Using valgrind for tests")
183+
unittest.main(verbosity=2)

0 commit comments

Comments
 (0)