From 990700f96ffe2182b0c6cfd62b5fe655c1bddef6 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sun, 27 Jan 2019 18:25:00 -0800 Subject: [PATCH 1/4] use only one mmapped file --- cpp/src/plasma/store.cc | 42 +++++++++++++---------------- python/pyarrow/plasma.py | 5 ---- python/pyarrow/tests/test_plasma.py | 25 ++++++++--------- 3 files changed, 32 insertions(+), 40 deletions(-) diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index ca49d010aa5..343ccf5b886 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -905,21 +905,22 @@ class PlasmaStoreRunner { PlasmaStoreRunner() {} void Start(char* socket_name, int64_t system_memory, std::string directory, - bool hugepages_enabled, bool use_one_memory_mapped_file) { + bool hugepages_enabled) { // Create the event loop. loop_.reset(new EventLoop); store_.reset( new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled)); plasma_config = store_->GetPlasmaStoreInfo(); - // If the store is configured to use a single memory-mapped file, then we - // achieve that by mallocing and freeing a single large amount of space. - // that maximum allowed size up front. - if (use_one_memory_mapped_file) { - void* pointer = plasma::dlmemalign(kBlockSize, system_memory); - ARROW_CHECK(pointer != nullptr); - plasma::dlfree(pointer); - } + // We are using a single memory-mapped file by mallocing and freeing a single + // large amount of space up front. According to the documentation, + // dlmalloc might need up to 128*sizeof(size_t) bytes for internal + // bookkeeping. + void* pointer = plasma::dlmemalign(kBlockSize, system_memory - 256 * sizeof(size_t)); + ARROW_CHECK(pointer != nullptr); + // This will unmap the file, but the next one created will be as large + // as this one (this is an implementation detail of dlmalloc). + plasma::dlfree(pointer); int socket = BindIpcSock(socket_name, true); // TODO(pcm): Check return value. @@ -955,15 +956,14 @@ void HandleSignal(int signal) { } void StartServer(char* socket_name, int64_t system_memory, std::string plasma_directory, - bool hugepages_enabled, bool use_one_memory_mapped_file) { + bool hugepages_enabled) { // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write // to a client that has already died, the store could die. signal(SIGPIPE, SIG_IGN); g_runner.reset(new PlasmaStoreRunner()); signal(SIGTERM, HandleSignal); - g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled, - use_one_memory_mapped_file); + g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled); } } // namespace plasma @@ -975,11 +975,9 @@ int main(int argc, char* argv[]) { // Directory where plasma memory mapped files are stored. std::string plasma_directory; bool hugepages_enabled = false; - // True if a single large memory-mapped file should be created at startup. - bool use_one_memory_mapped_file = false; int64_t system_memory = -1; int c; - while ((c = getopt(argc, argv, "s:m:d:hf")) != -1) { + while ((c = getopt(argc, argv, "s:m:d:h")) != -1) { switch (c) { case 'd': plasma_directory = std::string(optarg); @@ -994,14 +992,16 @@ int main(int argc, char* argv[]) { char extra; int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra); ARROW_CHECK(scanned == 1); + // Set system memory, potentially rounding it to a page size + // Also make it so dlmalloc fails if we try to request more memory than + // is available. + system_memory = + plasma::dlmalloc_set_footprint_limit(static_cast(system_memory)); ARROW_LOG(INFO) << "Allowing the Plasma store to use up to " << static_cast(system_memory) / 1000000000 << "GB of memory."; break; } - case 'f': - use_one_memory_mapped_file = true; - break; default: exit(-1); } @@ -1051,12 +1051,8 @@ int main(int argc, char* argv[]) { SetMallocGranularity(1024 * 1024 * 1024); // 1 GB } #endif - // Make it so dlmalloc fails if we try to request more memory than is - // available. - plasma::dlmalloc_set_footprint_limit((size_t)system_memory); ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; - plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled, - use_one_memory_mapped_file); + plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled); plasma::g_runner->Shutdown(); plasma::g_runner = nullptr; diff --git a/python/pyarrow/plasma.py b/python/pyarrow/plasma.py index 056172c9800..a6ab362536d 100644 --- a/python/pyarrow/plasma.py +++ b/python/pyarrow/plasma.py @@ -78,7 +78,6 @@ def build_plasma_tensorflow_op(): @contextlib.contextmanager def start_plasma_store(plasma_store_memory, use_valgrind=False, use_profiler=False, - use_one_memory_mapped_file=False, plasma_directory=None, use_hugepages=False): """Start a plasma store process. Args: @@ -87,8 +86,6 @@ def start_plasma_store(plasma_store_memory, of valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the plasma store should be started inside a profiler. If this is True, use_valgrind must be False. - use_one_memory_mapped_file: If True, then the store will use only a - single memory-mapped file. plasma_directory (str): Directory where plasma memory mapped files will be stored. use_hugepages (bool): True if the plasma store should use huge pages. @@ -107,8 +104,6 @@ def start_plasma_store(plasma_store_memory, command = [plasma_store_executable, "-s", plasma_store_name, "-m", str(plasma_store_memory)] - if use_one_memory_mapped_file: - command += ["-f"] if plasma_directory: command += ["-d", plasma_directory] if use_hugepages: diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 05375d7b65a..bcb467aab8e 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -37,6 +37,7 @@ DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8 USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1" +SMALL_OBJECT_SIZE = 9000 def random_name(): @@ -110,15 +111,11 @@ def assert_get_object_equal(unit_test, client1, client2, object_id, class TestPlasmaClient(object): def setup_method(self, test_method): - use_one_memory_mapped_file = (test_method == - self.test_use_one_memory_mapped_file) - import pyarrow.plasma as plasma # Start Plasma store. self.plasma_store_ctx = plasma.start_plasma_store( plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, - use_valgrind=USE_VALGRIND, - use_one_memory_mapped_file=use_one_memory_mapped_file) + use_valgrind=USE_VALGRIND) self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() # Connect to Plasma. self.plasma_client = plasma.connect(self.plasma_store_name) @@ -471,22 +468,26 @@ def assert_create_raises_plasma_full(unit_test, size): memory_buffers.append(memory_buffer) # Remaining space is 50%. Make sure that we can't create an # object of size 50% + 1, but we can create one of size 20%. - assert_create_raises_plasma_full(self, 50 * PERCENT + 1) + assert_create_raises_plasma_full( + self, 50 * PERCENT + SMALL_OBJECT_SIZE) _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) del memory_buffer _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) del memory_buffer - assert_create_raises_plasma_full(self, 50 * PERCENT + 1) + assert_create_raises_plasma_full( + self, 50 * PERCENT + SMALL_OBJECT_SIZE) _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) memory_buffers.append(memory_buffer) # Remaining space is 30%. - assert_create_raises_plasma_full(self, 30 * PERCENT + 1) + assert_create_raises_plasma_full( + self, 30 * PERCENT + SMALL_OBJECT_SIZE) _, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT) memory_buffers.append(memory_buffer) # Remaining space is 20%. - assert_create_raises_plasma_full(self, 20 * PERCENT + 1) + assert_create_raises_plasma_full( + self, 20 * PERCENT + SMALL_OBJECT_SIZE) def test_contains(self): fake_object_ids = [random_object_id() for _ in range(100)] @@ -838,7 +839,7 @@ def test_subscribe_deletions(self): assert -1 == recv_dsize assert -1 == recv_msize - def test_use_one_memory_mapped_file(self): + def test_use_full_memory(self): # Fill the object store up with a large number of small objects and let # them go out of scope. for _ in range(100): @@ -851,8 +852,8 @@ def test_use_one_memory_mapped_file(self): create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0) # Verify that an object that is too large does not fit. with pytest.raises(pa.lib.PlasmaStoreFull): - create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + 1, - 0) + create_object(self.plasma_client2, + DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0) def test_client_death_during_get(self): import pyarrow.plasma as plasma From 6072e8fd202ce37cc2503c2036963f260d109ecd Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 28 Jan 2019 16:03:32 -0800 Subject: [PATCH 2/4] add verbose flag --- ci/travis_script_cpp.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh index 14529b03160..74783e20480 100755 --- a/ci/travis_script_cpp.sh +++ b/ci/travis_script_cpp.sh @@ -23,7 +23,7 @@ source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh pushd $CPP_BUILD_DIR -PATH=$ARROW_BUILD_TYPE:$PATH ctest -j2 --output-on-failure -L unittest +PATH=$ARROW_BUILD_TYPE:$PATH ctest -j2 --verbose --output-on-failure -L unittest popd From f885f499ec4f465eb2e960225a1223fb3b8d5f86 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 29 Jan 2019 00:01:56 -0800 Subject: [PATCH 3/4] reduce plasma store size for test --- cpp/src/plasma/test/client_tests.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 30dc6850cd0..1678e27f90f 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -60,7 +60,7 @@ class TestPlasmaStore : public ::testing::Test { std::string plasma_directory = test_executable.substr(0, test_executable.find_last_of("/")); std::string plasma_command = plasma_directory + - "/plasma_store_server -m 1000000000 -s " + + "/plasma_store_server -m 10000000 -s " + store_socket_name_ + " 1> /dev/null 2> /dev/null &"; system(plasma_command.c_str()); ARROW_CHECK_OK(client_.Connect(store_socket_name_, "")); From c447af570d21bfeff0d2d6c47fef56a33208fe85 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 29 Jan 2019 00:40:13 -0800 Subject: [PATCH 4/4] remove --verbose --- ci/travis_script_cpp.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh index 74783e20480..14529b03160 100755 --- a/ci/travis_script_cpp.sh +++ b/ci/travis_script_cpp.sh @@ -23,7 +23,7 @@ source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh pushd $CPP_BUILD_DIR -PATH=$ARROW_BUILD_TYPE:$PATH ctest -j2 --verbose --output-on-failure -L unittest +PATH=$ARROW_BUILD_TYPE:$PATH ctest -j2 --output-on-failure -L unittest popd