diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index 2a93a6303ef..729fba7e944 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -31,9 +31,6 @@ find_package(Threads) set(PLASMA_SO_VERSION "${ARROW_SO_VERSION}") set(PLASMA_FULL_SO_VERSION "${ARROW_FULL_SO_VERSION}") -include_directories("${CMAKE_CURRENT_LIST_DIR}/" "${CMAKE_CURRENT_LIST_DIR}/thirdparty/" - "${CMAKE_CURRENT_LIST_DIR}/../") - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion") # Compile flatbuffers @@ -64,17 +61,14 @@ add_custom_command( set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC") -set(PLASMA_SRCS - client.cc - common.cc - eviction_policy.cc +set(PLASMA_SRCS client.cc common.cc fling.cc io.cc malloc.cc plasma.cc protocol.cc) + +set(PLASMA_STORE_SRCS + dlmalloc.cc events.cc - fling.cc - io.cc - malloc.cc - plasma.cc + eviction_policy.cc plasma_allocator.cc - protocol.cc + store.cc thirdparty/ae/ae.c) set(PLASMA_LINK_LIBS arrow_shared) @@ -106,10 +100,10 @@ endforeach() # The optimization flag -O3 is suggested by dlmalloc.c, which is #included in # malloc.cc; we set it here regardless of whether we do a debug or release build. -set_source_files_properties(malloc.cc PROPERTIES COMPILE_FLAGS "-O3") +set_source_files_properties(dlmalloc.cc PROPERTIES COMPILE_FLAGS "-O3") if("${COMPILER_FAMILY}" STREQUAL "clang") - set_property(SOURCE malloc.cc + set_property(SOURCE dlmalloc.cc APPEND_STRING PROPERTY COMPILE_FLAGS " -Wno-parentheses-equality \ -Wno-null-pointer-arithmetic \ @@ -118,14 +112,14 @@ if("${COMPILER_FAMILY}" STREQUAL "clang") endif() if("${COMPILER_FAMILY}" STREQUAL "gcc") - set_property(SOURCE malloc.cc APPEND_STRING PROPERTY COMPILE_FLAGS " -Wno-conversion") + set_property(SOURCE dlmalloc.cc APPEND_STRING PROPERTY COMPILE_FLAGS " -Wno-conversion") endif() list(APPEND PLASMA_EXTERNAL_STORE_SOURCES "external_store.cc" "hash_table_store.cc") # We use static libraries for the plasma_store_server executable so that it can # be copied around and used in different locations. -add_executable(plasma_store_server ${PLASMA_EXTERNAL_STORE_SOURCES} store.cc) +add_executable(plasma_store_server ${PLASMA_EXTERNAL_STORE_SOURCES} ${PLASMA_STORE_SRCS}) if(ARROW_BUILD_STATIC) target_link_libraries(plasma_store_server plasma_static ${PLASMA_STATIC_LINK_LIBS}) else() diff --git a/cpp/src/plasma/dlmalloc.cc b/cpp/src/plasma/dlmalloc.cc new file mode 100644 index 00000000000..463e967e036 --- /dev/null +++ b/cpp/src/plasma/dlmalloc.cc @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "plasma/malloc.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "plasma/common.h" +#include "plasma/plasma.h" + +namespace plasma { + +void* fake_mmap(size_t); +int fake_munmap(void*, int64_t); + +#define MMAP(s) fake_mmap(s) +#define MUNMAP(a, s) fake_munmap(a, s) +#define DIRECT_MMAP(s) fake_mmap(s) +#define DIRECT_MUNMAP(a, s) fake_munmap(a, s) +#define USE_DL_PREFIX +#define HAVE_MORECORE 0 +#define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T +#define DEFAULT_GRANULARITY ((size_t)128U * 1024U) + +#include "plasma/thirdparty/dlmalloc.c" // NOLINT + +#undef MMAP +#undef MUNMAP +#undef DIRECT_MMAP +#undef DIRECT_MUNMAP +#undef USE_DL_PREFIX +#undef HAVE_MORECORE +#undef DEFAULT_GRANULARITY + +// dlmalloc.c defined DEBUG which will conflict with ARROW_LOG(DEBUG). +#ifdef DEBUG +#undef DEBUG +#endif + +constexpr int GRANULARITY_MULTIPLIER = 2; + +static void* pointer_advance(void* p, ptrdiff_t n) { return (unsigned char*)p + n; } + +static void* pointer_retreat(void* p, ptrdiff_t n) { return (unsigned char*)p - n; } + +// Create a buffer. This is creating a temporary file and then +// immediately unlinking it so we do not leave traces in the system. +int create_buffer(int64_t size) { + int fd; + std::string file_template = plasma_config->directory; +#ifdef _WIN32 + if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, + (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), + (DWORD)(uint64_t)size, NULL)) { + fd = -1; + } +#else + file_template += "/plasmaXXXXXX"; + std::vector file_name(file_template.begin(), file_template.end()); + file_name.push_back('\0'); + fd = mkstemp(&file_name[0]); + if (fd < 0) { + ARROW_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0]; + return -1; + } + // Immediately unlink the file so we do not leave traces in the system. + if (unlink(&file_name[0]) != 0) { + ARROW_LOG(FATAL) << "failed to unlink file " << &file_name[0]; + return -1; + } + if (!plasma_config->hugepages_enabled) { + // Increase the size of the file to the desired size. This seems not to be + // needed for files that are backed by the huge page fs, see also + // http://www.mail-archive.com/kvm-devel@lists.sourceforge.net/msg14737.html + if (ftruncate(fd, (off_t)size) != 0) { + ARROW_LOG(FATAL) << "failed to ftruncate file " << &file_name[0]; + return -1; + } + } +#endif + return fd; +} + +void* fake_mmap(size_t size) { + // Add kMmapRegionsGap so that the returned pointer is deliberately not + // page-aligned. This ensures that the segments of memory returned by + // fake_mmap are never contiguous. + size += kMmapRegionsGap; + + int fd = create_buffer(size); + ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap"; + // MAP_POPULATE can be used to pre-populate the page tables for this memory region + // which avoids work when accessing the pages later. However it causes long pauses + // when mmapping the files. Only supported on Linux. + void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (pointer == MAP_FAILED) { + ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno); + if (errno == ENOMEM && plasma_config->hugepages_enabled) { + ARROW_LOG(ERROR) + << " (this probably means you have to increase /proc/sys/vm/nr_hugepages)"; + } + return pointer; + } + + // Increase dlmalloc's allocation granularity directly. + mparams.granularity *= GRANULARITY_MULTIPLIER; + + MmapRecord& record = mmap_records[pointer]; + record.fd = fd; + record.size = size; + + // We lie to dlmalloc about where mapped memory actually lives. + pointer = pointer_advance(pointer, kMmapRegionsGap); + ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")"; + return pointer; +} + +int fake_munmap(void* addr, int64_t size) { + ARROW_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")"; + addr = pointer_retreat(addr, kMmapRegionsGap); + size += kMmapRegionsGap; + + auto entry = mmap_records.find(addr); + + if (entry == mmap_records.end() || entry->second.size != size) { + // Reject requests to munmap that don't directly match previous + // calls to mmap, to prevent dlmalloc from trimming. + return -1; + } + + int r = munmap(addr, size); + if (r == 0) { + close(entry->second.fd); + } + + mmap_records.erase(entry); + return r; +} + +void SetMallocGranularity(int value) { change_mparam(M_GRANULARITY, value); } + +} // namespace plasma diff --git a/cpp/src/plasma/events.cc b/cpp/src/plasma/events.cc index d49c577fdf1..28ff1267545 100644 --- a/cpp/src/plasma/events.cc +++ b/cpp/src/plasma/events.cc @@ -22,7 +22,7 @@ #include extern "C" { -#include "ae/ae.h" +#include "plasma/thirdparty/ae/ae.h" } namespace plasma { diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc index 9ba23e552c2..ba5f2551919 100644 --- a/cpp/src/plasma/io.cc +++ b/cpp/src/plasma/io.cc @@ -30,9 +30,9 @@ using arrow::Status; /// Number of times we try connecting to a socket. -constexpr int64_t kNumConnectAttempts = 50; +constexpr int64_t kNumConnectAttempts = 20; /// Time to wait between connection attempts to a socket. -constexpr int64_t kConnectTimeoutMs = 100; +constexpr int64_t kConnectTimeoutMs = 400; namespace plasma { diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index a03e3fa6127..bb027a6cb90 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -27,173 +27,21 @@ #include #include -#include #include #include "plasma/common.h" #include "plasma/plasma.h" -extern "C" { -void* fake_mmap(size_t); -int fake_munmap(void*, int64_t); +namespace plasma { -#define MMAP(s) fake_mmap(s) -#define MUNMAP(a, s) fake_munmap(a, s) -#define DIRECT_MMAP(s) fake_mmap(s) -#define DIRECT_MUNMAP(a, s) fake_munmap(a, s) -#define USE_DL_PREFIX -#define HAVE_MORECORE 0 -#define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T -#define DEFAULT_GRANULARITY ((size_t)128U * 1024U) - -#include "thirdparty/dlmalloc.c" // NOLINT - -#undef MMAP -#undef MUNMAP -#undef DIRECT_MMAP -#undef DIRECT_MUNMAP -#undef USE_DL_PREFIX -#undef HAVE_MORECORE -#undef DEFAULT_GRANULARITY -} - -// dlmalloc.c defined DEBUG which will conflict with ARROW_LOG(DEBUG). -#ifdef DEBUG -#undef DEBUG -#endif - -struct mmap_record { - int fd; - int64_t size; -}; - -namespace { - -/// Hashtable that contains one entry per segment that we got from the OS -/// via mmap. Associates the address of that segment with its file descriptor -/// and size. -std::unordered_map mmap_records; - -} // namespace - -constexpr int GRANULARITY_MULTIPLIER = 2; +std::unordered_map mmap_records; static void* pointer_advance(void* p, ptrdiff_t n) { return (unsigned char*)p + n; } -static void* pointer_retreat(void* p, ptrdiff_t n) { return (unsigned char*)p - n; } - static ptrdiff_t pointer_distance(void const* pfrom, void const* pto) { return (unsigned char const*)pto - (unsigned char const*)pfrom; } -// Create a buffer. This is creating a temporary file and then -// immediately unlinking it so we do not leave traces in the system. -int create_buffer(int64_t size) { - int fd; - std::string file_template = plasma::plasma_config->directory; -#ifdef _WIN32 - if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, - (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), - (DWORD)(uint64_t)size, NULL)) { - fd = -1; - } -#else - file_template += "/plasmaXXXXXX"; - std::vector file_name(file_template.begin(), file_template.end()); - file_name.push_back('\0'); - fd = mkstemp(&file_name[0]); - if (fd < 0) { - ARROW_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0]; - return -1; - } - - FILE* file = fdopen(fd, "a+"); - if (!file) { - close(fd); - ARROW_LOG(FATAL) << "create_buffer: fdopen failed for " << &file_name[0]; - return -1; - } - // Immediately unlink the file so we do not leave traces in the system. - if (unlink(&file_name[0]) != 0) { - ARROW_LOG(FATAL) << "failed to unlink file " << &file_name[0]; - return -1; - } - if (!plasma::plasma_config->hugepages_enabled) { - // Increase the size of the file to the desired size. This seems not to be - // needed for files that are backed by the huge page fs, see also - // http://www.mail-archive.com/kvm-devel@lists.sourceforge.net/msg14737.html - if (ftruncate(fd, (off_t)size) != 0) { - ARROW_LOG(FATAL) << "failed to ftruncate file " << &file_name[0]; - return -1; - } - } - int ret = dup(fd); - if (ret < 0) { - ARROW_LOG(FATAL) << "failed to dup the descriptor"; - } else { - fclose(file); - fd = ret; - } -#endif - return fd; -} - -void* fake_mmap(size_t size) { - // Add kMmapRegionsGap so that the returned pointer is deliberately not - // page-aligned. This ensures that the segments of memory returned by - // fake_mmap are never contiguous. - size += kMmapRegionsGap; - - int fd = create_buffer(size); - ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap"; - // MAP_POPULATE can be used to pre-populate the page tables for this memory region - // which avoids work when accessing the pages later. However it causes long pauses - // when mmapping the files. Only supported on Linux. - void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if (pointer == MAP_FAILED) { - ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno); - if (errno == ENOMEM && plasma::plasma_config->hugepages_enabled) { - ARROW_LOG(ERROR) - << " (this probably means you have to increase /proc/sys/vm/nr_hugepages)"; - } - return pointer; - } - - // Increase dlmalloc's allocation granularity directly. - mparams.granularity *= GRANULARITY_MULTIPLIER; - - mmap_record& record = mmap_records[pointer]; - record.fd = fd; - record.size = size; - - // We lie to dlmalloc about where mapped memory actually lives. - pointer = pointer_advance(pointer, kMmapRegionsGap); - ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")"; - return pointer; -} - -int fake_munmap(void* addr, int64_t size) { - ARROW_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")"; - addr = pointer_retreat(addr, kMmapRegionsGap); - size += kMmapRegionsGap; - - auto entry = mmap_records.find(addr); - - if (entry == mmap_records.end() || entry->second.size != size) { - // Reject requests to munmap that don't directly match previous - // calls to mmap, to prevent dlmalloc from trimming. - return -1; - } - - int r = munmap(addr, size); - if (r == 0) { - close(entry->second.fd); - } - - mmap_records.erase(entry); - return r; -} - void GetMallocMapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) { // TODO(rshin): Implement a more efficient search through mmap_records. for (const auto& entry : mmap_records) { @@ -219,4 +67,4 @@ int64_t GetMmapSize(int fd) { return -1; // This code is never reached. } -void SetMallocGranularity(int value) { change_mparam(M_GRANULARITY, value); } +} // namespace plasma diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h index 86f14f199b8..a081190b3ef 100644 --- a/cpp/src/plasma/malloc.h +++ b/cpp/src/plasma/malloc.h @@ -21,6 +21,10 @@ #include #include +#include + +namespace plasma { + /// Gap between two consecutive mmap regions allocated by fake_mmap. /// This ensures that the segments of memory returned by /// fake_mmap are never contiguous and dlmalloc does not coalesce it @@ -35,6 +39,16 @@ void GetMallocMapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offse /// @return The size of the corresponding memory-mapped file. int64_t GetMmapSize(int fd); -void SetMallocGranularity(int value); +struct MmapRecord { + int fd; + int64_t size; +}; + +/// Hashtable that contains one entry per segment that we got from the OS +/// via mmap. Associates the address of that segment with its file descriptor +/// and size. +extern std::unordered_map mmap_records; + +} // namespace plasma -#endif // MALLOC_H +#endif // PLASMA_MALLOC_H diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc index e1c10369dc6..d0ef4f8d317 100644 --- a/cpp/src/plasma/plasma.cc +++ b/cpp/src/plasma/plasma.cc @@ -23,7 +23,6 @@ #include "plasma/common.h" #include "plasma/common_generated.h" -#include "plasma/plasma_allocator.h" #include "plasma/protocol.h" namespace fb = plasma::flatbuf; @@ -32,10 +31,7 @@ namespace plasma { ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), ref_count(0) {} -ObjectTableEntry::~ObjectTableEntry() { - PlasmaAllocator::Free(pointer, data_size + metadata_size); - pointer = nullptr; -} +ObjectTableEntry::~ObjectTableEntry() { pointer = nullptr; } int WarnIfSigpipe(int status, int client_sock) { if (status >= 0) { diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index a3f08e7aa69..2c77f19bbc4 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -77,6 +77,8 @@ namespace fb = plasma::flatbuf; namespace plasma { +void SetMallocGranularity(int value); + struct GetRequest { GetRequest(Client* client, const std::vector& object_ids); /// The client that called get. @@ -528,6 +530,12 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id, } } +void PlasmaStore::EraseFromObjectTable(const ObjectID& object_id) { + auto& object = store_info_.objects[object_id]; + PlasmaAllocator::Free(object->pointer, object->data_size + object->metadata_size); + store_info_.objects.erase(object_id); +} + void PlasmaStore::ReleaseObject(const ObjectID& object_id, Client* client) { auto entry = GetObjectTableEntry(&store_info_, object_id); ARROW_CHECK(entry != nullptr); @@ -581,7 +589,7 @@ int PlasmaStore::AbortObject(const ObjectID& object_id, Client* client) { return 0; } else { // The client requesting the abort is the creator. Free the object. - store_info_.objects.erase(object_id); + EraseFromObjectTable(object_id); return 1; } } @@ -611,8 +619,7 @@ PlasmaError PlasmaStore::DeleteObject(ObjectID& object_id) { } eviction_policy_.RemoveObject(object_id); - - store_info_.objects.erase(object_id); + EraseFromObjectTable(object_id); // Inform all subscribers that the object has been deleted. fb::ObjectInfoT notification; notification.object_id = object_id.binary(); @@ -647,7 +654,7 @@ void PlasmaStore::EvictObjects(const std::vector& object_ids) { } else { // If there is no backing external store, just erase the object entry // and send a deletion notification. - store_info_.objects.erase(object_id); + EraseFromObjectTable(object_id); // Inform all subscribers that the object has been deleted. fb::ObjectInfoT notification; notification.object_id = object_id.binary(); @@ -1144,7 +1151,7 @@ int main(int argc, char* argv[]) { system_memory = shm_mem_avail; } } else { - SetMallocGranularity(1024 * 1024 * 1024); // 1 GB + plasma::SetMallocGranularity(1024 * 1024 * 1024); // 1 GB } #endif // Get external store diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 16ae6750dab..53464abde8f 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -212,6 +212,8 @@ class PlasmaStore { int RemoveFromClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry, Client* client); + void EraseFromObjectTable(const ObjectID& object_id); + uint8_t* AllocateMemory(size_t size, int* fd, int64_t* map_size, ptrdiff_t* offset); #ifdef PLASMA_CUDA Status AllocateCudaMemory(int device_num, int64_t size, uint8_t** out_pointer, diff --git a/cpp/src/plasma/thirdparty/ae/ae.c b/cpp/src/plasma/thirdparty/ae/ae.c index e66808a8146..dfb72244409 100644 --- a/cpp/src/plasma/thirdparty/ae/ae.c +++ b/cpp/src/plasma/thirdparty/ae/ae.c @@ -40,22 +40,22 @@ #include #include -#include "ae.h" -#include "zmalloc.h" -#include "config.h" +#include "plasma/thirdparty/ae/ae.h" +#include "plasma/thirdparty/ae/zmalloc.h" +#include "plasma/thirdparty/ae/config.h" /* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ #ifdef HAVE_EVPORT -#include "ae_evport.c" +#include "plasma/thirdparty/ae/ae_evport.c" #else #ifdef HAVE_EPOLL - #include "ae_epoll.c" + #include "plasma/thirdparty/ae/ae_epoll.c" #else #ifdef HAVE_KQUEUE - #include "ae_kqueue.c" + #include "plasma/thirdparty/ae/ae_kqueue.c" #else - #include "ae_select.c" + #include "plasma/thirdparty/ae/ae_select.c" #endif #endif #endif