From 5ee9f8f1f04f769301962f0d3180413efce45d83 Mon Sep 17 00:00:00 2001 From: xianliang Date: Thu, 1 Feb 2024 20:41:04 +0800 Subject: [PATCH] fix diskann async cache generation Signed-off-by: xianliang --- CMakeLists.txt | 2 +- cmake/utils/compile_flags.cmake | 4 +++ include/knowhere/comp/thread_pool.h | 41 +++++++++++++++-------- thirdparty/DiskANN/src/pq_flash_index.cpp | 31 ++++++++++++----- 4 files changed, 55 insertions(+), 23 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ace217aa5..c418239ef 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -63,8 +63,8 @@ endif() add_definitions(-DNOT_COMPILE_FOR_SWIG) -include(cmake/utils/compile_flags.cmake) include(cmake/utils/platform_check.cmake) +include(cmake/utils/compile_flags.cmake) include(cmake/libs/libfaiss.cmake) include(cmake/libs/libhnsw.cmake) diff --git a/cmake/utils/compile_flags.cmake b/cmake/utils/compile_flags.cmake index 26c0d40f8..bcfb09723 100644 --- a/cmake/utils/compile_flags.cmake +++ b/cmake/utils/compile_flags.cmake @@ -17,6 +17,10 @@ endif() set(CMAKE_CXX_FLAGS "-Wall -fPIC ${CMAKE_CXX_FLAGS}") +if(__X86_64) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -msse4.2") +endif() + set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g") set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG") diff --git a/include/knowhere/comp/thread_pool.h b/include/knowhere/comp/thread_pool.h index a00920de6..20231dd7f 100644 --- a/include/knowhere/comp/thread_pool.h +++ b/include/knowhere/comp/thread_pool.h @@ -21,12 +21,15 @@ #include #include "folly/executors/CPUThreadPoolExecutor.h" +#include "folly/executors/task_queue/UnboundedBlockingQueue.h" #include "folly/futures/Future.h" #include "knowhere/log.h" namespace knowhere { class ThreadPool { + public: + enum class QueueType { LIFO, FIFO }; #ifdef __linux__ private: class LowPriorityThreadFactory : public folly::NamedThreadFactory { @@ -47,23 +50,33 @@ class ThreadPool { }; public: - explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix) - : pool_(folly::CPUThreadPoolExecutor( - num_threads, - std::make_unique< - folly::LifoSemMPMCQueue>( - num_threads * kTaskQueueFactor), - std::make_shared(thread_name_prefix))) { + explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix, QueueType queueT = QueueType::LIFO) + : pool_(queueT == QueueType::LIFO + ? folly::CPUThreadPoolExecutor( + num_threads, + std::make_unique>( + num_threads * kTaskQueueFactor), + std::make_shared(thread_name_prefix)) + : folly::CPUThreadPoolExecutor( + num_threads, + std::make_unique>(), + std::make_shared(thread_name_prefix))) { } #else public: - explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix) - : pool_(folly::CPUThreadPoolExecutor( - num_threads, - std::make_unique< - folly::LifoSemMPMCQueue>( - num_threads * kTaskQueueFactor), - std::make_shared(thread_name_prefix))) { + explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix, QueueType queueT = QueueType::LIFO) + : pool_(queueT == QueueType::LIFO + ? folly::CPUThreadPoolExecutor( + num_threads, + std::make_unique>( + num_threads * kTaskQueueFactor), + std::make_shared(thread_name_prefix)) + : folly::CPUThreadPoolExecutor( + num_threads, + std::make_unique>(), + std::make_shared(thread_name_prefix))) { } #endif diff --git a/thirdparty/DiskANN/src/pq_flash_index.cpp b/thirdparty/DiskANN/src/pq_flash_index.cpp index fc264d816..6b67f6574 100644 --- a/thirdparty/DiskANN/src/pq_flash_index.cpp +++ b/thirdparty/DiskANN/src/pq_flash_index.cpp @@ -56,7 +56,7 @@ namespace { static auto async_pool = - knowhere::ThreadPool(1, "DiskANN_Async_Cache_Making"); + knowhere::ThreadPool(1, "DiskANN_Async_Cache_Making", knowhere::ThreadPool::QueueType::FIFO); constexpr _u64 kRefineBeamWidthFactor = 2; constexpr _u64 kBruteForceTopkRefineExpansionFactor = 2; @@ -199,18 +199,20 @@ namespace diskann { _u64 num_cached_nodes = node_list.size(); LOG_KNOWHERE_DEBUG_ << "Loading the cache list(" << num_cached_nodes << " points) into memory..."; - assert(this->nhood_cache_buf == nullptr && "nhoodc_cache_buf is not null"); - assert(this->coord_cache_buf == nullptr && "coord_cache_buf is not null"); auto ctx = this->reader->get_ctx(); - nhood_cache_buf = new unsigned[num_cached_nodes * (max_degree + 1)]; - memset(nhood_cache_buf, 0, num_cached_nodes * (max_degree + 1)); + if (nhood_cache_buf == nullptr) { + nhood_cache_buf = new unsigned[num_cached_nodes * (max_degree + 1)]; + memset(nhood_cache_buf, 0, num_cached_nodes * (max_degree + 1)); + } _u64 coord_cache_buf_len = num_cached_nodes * aligned_dim; - diskann::alloc_aligned((void **) &coord_cache_buf, - coord_cache_buf_len * sizeof(T), 8 * sizeof(T)); - memset(coord_cache_buf, 0, coord_cache_buf_len * sizeof(T)); + if (coord_cache_buf == nullptr) { + diskann::alloc_aligned((void **) &coord_cache_buf, + coord_cache_buf_len * sizeof(T), 8 * sizeof(T)); + memset(coord_cache_buf, 0, coord_cache_buf_len * sizeof(T)); + } size_t BLOCK_SIZE = 32; size_t num_blocks = DIV_ROUND_UP(num_cached_nodes, BLOCK_SIZE); @@ -291,6 +293,19 @@ namespace diskann { } this->count_visited_nodes.store(true); + // sync allocate memory + if (nhood_cache_buf == nullptr) { + nhood_cache_buf = new unsigned[num_nodes_to_cache * (max_degree + 1)]; + memset(nhood_cache_buf, 0, num_nodes_to_cache * (max_degree + 1)); + } + + _u64 coord_cache_buf_len = num_nodes_to_cache * aligned_dim; + if (coord_cache_buf == nullptr) { + diskann::alloc_aligned((void **) &coord_cache_buf, + coord_cache_buf_len * sizeof(T), 8 * sizeof(T)); + memset(coord_cache_buf, 0, coord_cache_buf_len * sizeof(T)); + } + async_pool.push([&, state_controller = this->state_controller, sample_bin, l_search, beamwidth, num_nodes_to_cache]() { {