Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ endif()

add_definitions(-DNOT_COMPILE_FOR_SWIG)

include(cmake/utils/compile_flags.cmake)
include(cmake/utils/platform_check.cmake)
include(cmake/utils/compile_flags.cmake)
include(cmake/libs/libfaiss.cmake)
include(cmake/libs/libhnsw.cmake)

Expand Down
4 changes: 4 additions & 0 deletions cmake/utils/compile_flags.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ endif()

set(CMAKE_CXX_FLAGS "-Wall -fPIC ${CMAKE_CXX_FLAGS}")

if(__X86_64)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -msse4.2")
endif()

set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g")
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG")

Expand Down
41 changes: 27 additions & 14 deletions include/knowhere/comp/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
#include <utility>

#include "folly/executors/CPUThreadPoolExecutor.h"
#include "folly/executors/task_queue/UnboundedBlockingQueue.h"
#include "folly/futures/Future.h"
#include "knowhere/log.h"

namespace knowhere {

class ThreadPool {
public:
enum class QueueType { LIFO, FIFO };
#ifdef __linux__
private:
class LowPriorityThreadFactory : public folly::NamedThreadFactory {
Expand All @@ -47,23 +50,33 @@ class ThreadPool {
};

public:
explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix)
: pool_(folly::CPUThreadPoolExecutor(
num_threads,
std::make_unique<
folly::LifoSemMPMCQueue<folly::CPUThreadPoolExecutor::CPUTask, folly::QueueBehaviorIfFull::BLOCK>>(
num_threads * kTaskQueueFactor),
std::make_shared<LowPriorityThreadFactory>(thread_name_prefix))) {
explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix, QueueType queueT = QueueType::LIFO)
: pool_(queueT == QueueType::LIFO
? folly::CPUThreadPoolExecutor(
num_threads,
std::make_unique<folly::LifoSemMPMCQueue<folly::CPUThreadPoolExecutor::CPUTask,
folly::QueueBehaviorIfFull::BLOCK>>(
num_threads * kTaskQueueFactor),
std::make_shared<LowPriorityThreadFactory>(thread_name_prefix))
: folly::CPUThreadPoolExecutor(
num_threads,
std::make_unique<folly::UnboundedBlockingQueue<folly::CPUThreadPoolExecutor::CPUTask>>(),
std::make_shared<LowPriorityThreadFactory>(thread_name_prefix))) {
}
#else
public:
explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix)
: pool_(folly::CPUThreadPoolExecutor(
num_threads,
std::make_unique<
folly::LifoSemMPMCQueue<folly::CPUThreadPoolExecutor::CPUTask, folly::QueueBehaviorIfFull::BLOCK>>(
num_threads * kTaskQueueFactor),
std::make_shared<folly::NamedThreadFactory>(thread_name_prefix))) {
explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix, QueueType queueT = QueueType::LIFO)
: pool_(queueT == QueueType::LIFO
? folly::CPUThreadPoolExecutor(
num_threads,
std::make_unique<folly::LifoSemMPMCQueue<folly::CPUThreadPoolExecutor::CPUTask,
folly::QueueBehaviorIfFull::BLOCK>>(
num_threads * kTaskQueueFactor),
std::make_shared<folly::NamedThreadFactory>(thread_name_prefix))
: folly::CPUThreadPoolExecutor(
num_threads,
std::make_unique<folly::UnboundedBlockingQueue<folly::CPUThreadPoolExecutor::CPUTask>>(),
std::make_shared<folly::NamedThreadFactory>(thread_name_prefix))) {
}
#endif

Expand Down
31 changes: 23 additions & 8 deletions thirdparty/DiskANN/src/pq_flash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

namespace {
static auto async_pool =
knowhere::ThreadPool(1, "DiskANN_Async_Cache_Making");
knowhere::ThreadPool(1, "DiskANN_Async_Cache_Making", knowhere::ThreadPool::QueueType::FIFO);

constexpr _u64 kRefineBeamWidthFactor = 2;
constexpr _u64 kBruteForceTopkRefineExpansionFactor = 2;
Expand Down Expand Up @@ -199,18 +199,20 @@ namespace diskann {
_u64 num_cached_nodes = node_list.size();
LOG_KNOWHERE_DEBUG_ << "Loading the cache list(" << num_cached_nodes
<< " points) into memory...";
assert(this->nhood_cache_buf == nullptr && "nhoodc_cache_buf is not null");
assert(this->coord_cache_buf == nullptr && "coord_cache_buf is not null");

auto ctx = this->reader->get_ctx();

nhood_cache_buf = new unsigned[num_cached_nodes * (max_degree + 1)];
memset(nhood_cache_buf, 0, num_cached_nodes * (max_degree + 1));
if (nhood_cache_buf == nullptr) {
nhood_cache_buf = new unsigned[num_cached_nodes * (max_degree + 1)];
memset(nhood_cache_buf, 0, num_cached_nodes * (max_degree + 1));
}

_u64 coord_cache_buf_len = num_cached_nodes * aligned_dim;
diskann::alloc_aligned((void **) &coord_cache_buf,
coord_cache_buf_len * sizeof(T), 8 * sizeof(T));
memset(coord_cache_buf, 0, coord_cache_buf_len * sizeof(T));
if (coord_cache_buf == nullptr) {
diskann::alloc_aligned((void **) &coord_cache_buf,
coord_cache_buf_len * sizeof(T), 8 * sizeof(T));
memset(coord_cache_buf, 0, coord_cache_buf_len * sizeof(T));
}

size_t BLOCK_SIZE = 32;
size_t num_blocks = DIV_ROUND_UP(num_cached_nodes, BLOCK_SIZE);
Expand Down Expand Up @@ -291,6 +293,19 @@ namespace diskann {
}
this->count_visited_nodes.store(true);

// sync allocate memory
if (nhood_cache_buf == nullptr) {
nhood_cache_buf = new unsigned[num_nodes_to_cache * (max_degree + 1)];
memset(nhood_cache_buf, 0, num_nodes_to_cache * (max_degree + 1));
}

_u64 coord_cache_buf_len = num_nodes_to_cache * aligned_dim;
if (coord_cache_buf == nullptr) {
diskann::alloc_aligned((void **) &coord_cache_buf,
coord_cache_buf_len * sizeof(T), 8 * sizeof(T));
memset(coord_cache_buf, 0, coord_cache_buf_len * sizeof(T));
}

async_pool.push([&, state_controller = this->state_controller, sample_bin,
l_search, beamwidth, num_nodes_to_cache]() {
{
Expand Down