diff --git a/c_api/Index_c.cpp b/c_api/Index_c.cpp index f50282d263..0a1fb93c80 100644 --- a/c_api/Index_c.cpp +++ b/c_api/Index_c.cpp @@ -11,6 +11,7 @@ #include "Index_c.h" #include #include +#include #include "macros_impl.h" extern "C" { @@ -222,4 +223,9 @@ int faiss_Index_sa_decode( } CATCH_AND_HANDLE } + +void faiss_set_omp_threads(unsigned int n) { + faiss::set_num_omp_threads(n); +} + } diff --git a/c_api/Index_c.h b/c_api/Index_c.h index f34d294e50..21711c36e0 100644 --- a/c_api/Index_c.h +++ b/c_api/Index_c.h @@ -275,6 +275,8 @@ int faiss_Index_sa_decode( const uint8_t* bytes, float* x); +void faiss_set_omp_threads(unsigned int n); + #ifdef __cplusplus } #endif diff --git a/faiss/AutoTune.cpp b/faiss/AutoTune.cpp index f9aa4273c6..189d7ed83b 100644 --- a/faiss/AutoTune.cpp +++ b/faiss/AutoTune.cpp @@ -40,6 +40,8 @@ #include #include +#include + namespace faiss { AutoTuneCriterion::AutoTuneCriterion(idx_t nq, idx_t nnn) @@ -89,7 +91,7 @@ double IntersectionCriterion::evaluate(const float* /*D*/, const idx_t* I) (gt_I.size() == gt_nnn * nq && gt_nnn >= R && nnn >= R), "ground truth not initialized"); int64_t n_ok = 0; -#pragma omp parallel for reduction(+ : n_ok) +#pragma omp parallel for reduction(+ : n_ok) num_threads(num_omp_threads) for (idx_t q = 0; q < nq; q++) { n_ok += ranklist_intersection_size( R, >_I[q * gt_nnn], R, I + q * nnn); @@ -698,7 +700,7 @@ void ParameterSpace::explore( do { if (thread_over_batches) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t q0 = 0; q0 < nq; q0 += batchsize) { size_t q1 = q0 + batchsize; if (q1 > nq) diff --git a/faiss/CMakeLists.txt b/faiss/CMakeLists.txt index 629b2f07e8..c6448a30e2 100644 --- a/faiss/CMakeLists.txt +++ b/faiss/CMakeLists.txt @@ -47,6 +47,7 @@ set(FAISS_SRC IndexShardsIVF.cpp MatrixStats.cpp MetaIndexes.cpp + OMPConfig.cpp VectorTransform.cpp clone_index.cpp index_factory.cpp @@ -138,6 +139,7 @@ set(FAISS_HEADERS MatrixStats.h MetaIndexes.h MetricType.h + OMPConfig.h VectorTransform.h clone_index.h index_factory.h diff --git a/faiss/Clustering.cpp b/faiss/Clustering.cpp index 7cf2ba0948..a67312db56 100644 --- a/faiss/Clustering.cpp +++ b/faiss/Clustering.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -152,7 +153,7 @@ void compute_centroids( size_t line_size = codec ? codec->sa_code_size() : d * sizeof(float); -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { int nt = omp_get_num_threads(); int rank = omp_get_thread_num(); @@ -192,7 +193,7 @@ void compute_centroids( } } -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t ci = 0; ci < k; ci++) { if (hassign[ci] == 0) { continue; diff --git a/faiss/IVFlib.cpp b/faiss/IVFlib.cpp index 88ac7c7a2f..7c7dcc8f6b 100644 --- a/faiss/IVFlib.cpp +++ b/faiss/IVFlib.cpp @@ -469,7 +469,7 @@ void ivf_residual_add_from_flat_codes( const ResidualQuantizer& rq = index->rq; // populate inverted lists -#pragma omp parallel if (nb > 10000) +#pragma omp parallel if (nb > 10000) num_threads(num_omp_threads) { std::vector tmp_code(index->code_size); std::vector tmp(rq.d); diff --git a/faiss/Index.cpp b/faiss/Index.cpp index 204fc6dead..37936aa22b 100644 --- a/faiss/Index.cpp +++ b/faiss/Index.cpp @@ -8,6 +8,7 @@ // -*- c++ -*- #include +#include #include #include @@ -57,7 +58,7 @@ void Index::reconstruct(idx_t, float*) const { void Index::reconstruct_batch(idx_t n, const idx_t* keys, float* recons) const { std::mutex exception_mutex; std::string exception_string; -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { try { reconstruct(keys[i], &recons[i * d]); @@ -72,7 +73,7 @@ void Index::reconstruct_batch(idx_t n, const idx_t* keys, float* recons) const { } void Index::reconstruct_n(idx_t i0, idx_t ni, float* recons) const { -#pragma omp parallel for if (ni > 1000) +#pragma omp parallel for if (ni > 1000) num_threads(num_omp_threads) for (idx_t i = 0; i < ni; i++) { reconstruct(i0 + i, recons + i * d); } @@ -116,7 +117,7 @@ void Index::compute_residual_n( const float* xs, float* residuals, const idx_t* keys) const { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t i = 0; i < n; ++i) { compute_residual(&xs[i * d], &residuals[i * d], keys[i]); } diff --git a/faiss/Index2Layer.cpp b/faiss/Index2Layer.cpp index 39291debbf..af9909006a 100644 --- a/faiss/Index2Layer.cpp +++ b/faiss/Index2Layer.cpp @@ -322,7 +322,7 @@ void Index2Layer::sa_encode(idx_t n, const float* x, uint8_t* bytes) const { } void Index2Layer::sa_decode(idx_t n, const uint8_t* bytes, float* x) const { -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { std::vector residual(d); diff --git a/faiss/IndexAdditiveQuantizer.cpp b/faiss/IndexAdditiveQuantizer.cpp index 153b766e4a..66f98e3c7e 100644 --- a/faiss/IndexAdditiveQuantizer.cpp +++ b/faiss/IndexAdditiveQuantizer.cpp @@ -137,7 +137,7 @@ void search_with_decompress( using SingleResultHandler = typename ResultHandler::SingleResultHandler; -#pragma omp parallel for if(res.nq > 100) +#pragma omp parallel for if(res.nq > 100) num_threads(num_omp_threads) for (int64_t q = 0; q < res.nq; q++) { SingleResultHandler resi(res); resi.begin(q); @@ -170,7 +170,7 @@ void search_with_LUT( aq.compute_LUT(nq, xq, LUT.get()); -#pragma omp parallel for if(nq > 100) +#pragma omp parallel for if(nq > 100) num_threads(num_omp_threads) for (int64_t q = 0; q < nq; q++) { SingleResultHandler resi(res); resi.begin(q); @@ -571,7 +571,7 @@ void ResidualCoarseQuantizer::search( rq.refine_beam( n, 1, x, beam_size, codes.data(), nullptr, beam_distances.data()); -#pragma omp parallel for if (n > 4000) +#pragma omp parallel for if (n > 4000) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { memcpy(distances + i * k, beam_distances.data() + beam_size * i, diff --git a/faiss/IndexAdditiveQuantizerFastScan.cpp b/faiss/IndexAdditiveQuantizerFastScan.cpp index d549e094ce..98e99559fa 100644 --- a/faiss/IndexAdditiveQuantizerFastScan.cpp +++ b/faiss/IndexAdditiveQuantizerFastScan.cpp @@ -13,6 +13,7 @@ #include + #include #include #include @@ -131,7 +132,7 @@ void IndexAdditiveQuantizerFastScan::estimate_norm_scale( // TODO: try max of scales double scale = 0; -#pragma omp parallel for reduction(+ : scale) +#pragma omp parallel for reduction(+ : scale) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { const float* lut = dis_tables.data() + i * M * ksub; scale += quantize_lut::aq_estimate_norm_scale(M, ksub, 2, lut); diff --git a/faiss/IndexBinaryHNSW.cpp b/faiss/IndexBinaryHNSW.cpp index 1f034009f8..a86455fc7b 100644 --- a/faiss/IndexBinaryHNSW.cpp +++ b/faiss/IndexBinaryHNSW.cpp @@ -10,6 +10,7 @@ #include #include + #include #include #include @@ -116,7 +117,7 @@ void hnsw_add_vertices( std::swap(order[j], order[j + rng2.rand_int(i1 - j)]); } -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { VisitedTable vt(ntotal); @@ -201,7 +202,7 @@ void IndexBinaryHNSW::search( !params, "search params not supported for this index"); FAISS_THROW_IF_NOT(k > 0); -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { VisitedTable vt(ntotal); std::unique_ptr dis(get_distance_computer()); @@ -219,7 +220,7 @@ void IndexBinaryHNSW::search( } } -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < n * k; ++i) { distances[i] = std::round(((float*)distances)[i]); } diff --git a/faiss/IndexBinaryHash.cpp b/faiss/IndexBinaryHash.cpp index 0e449bab77..f22438e1f8 100644 --- a/faiss/IndexBinaryHash.cpp +++ b/faiss/IndexBinaryHash.cpp @@ -221,7 +221,7 @@ void IndexBinaryHash::range_search( !params, "search params not supported for this index"); size_t nlist = 0, ndis = 0, n0 = 0; -#pragma omp parallel if (n > 100) reduction(+ : ndis, n0, nlist) +#pragma omp parallel if (n > 100) reduction(+ : ndis, n0, nlist) num_threads(num_omp_threads) { RangeSearchPartialResult pres(result); @@ -255,7 +255,7 @@ void IndexBinaryHash::search( using HeapForL2 = CMax; size_t nlist = 0, ndis = 0, n0 = 0; -#pragma omp parallel for if (n > 100) reduction(+ : nlist, ndis, n0) +#pragma omp parallel for if (n > 100) reduction(+ : nlist, ndis, n0) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { int32_t* simi = distances + k * i; idx_t* idxi = labels + k * i; @@ -442,7 +442,7 @@ void IndexBinaryMultiHash::range_search( !params, "search params not supported for this index"); size_t nlist = 0, ndis = 0, n0 = 0; -#pragma omp parallel if (n > 100) reduction(+ : ndis, n0, nlist) +#pragma omp parallel if (n > 100) reduction(+ : ndis, n0, nlist) num_threads(num_omp_threads) { RangeSearchPartialResult pres(result); @@ -476,7 +476,7 @@ void IndexBinaryMultiHash::search( using HeapForL2 = CMax; size_t nlist = 0, ndis = 0, n0 = 0; -#pragma omp parallel for if (n > 100) reduction(+ : nlist, ndis, n0) +#pragma omp parallel for if (n > 100) reduction(+ : nlist, ndis, n0) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { int32_t* simi = distances + k * i; idx_t* idxi = labels + k * i; diff --git a/faiss/IndexBinaryIVF.cpp b/faiss/IndexBinaryIVF.cpp index 65b98280dc..53b66dedf3 100644 --- a/faiss/IndexBinaryIVF.cpp +++ b/faiss/IndexBinaryIVF.cpp @@ -10,6 +10,7 @@ #include #include + #include #include @@ -391,7 +392,7 @@ void search_knn_hamming_heap( using HeapForIP = CMin; using HeapForL2 = CMax; -#pragma omp parallel if (n > 1) reduction(+ : nlistv, ndis, nheap) +#pragma omp parallel if (n > 1) reduction(+ : nlistv, ndis, nheap) num_threads(num_omp_threads) { std::unique_ptr scanner( ivf.get_InvertedListScanner(store_pairs)); @@ -494,7 +495,7 @@ void search_knn_hamming_count( size_t nlistv = 0, ndis = 0; -#pragma omp parallel for reduction(+ : nlistv, ndis) +#pragma omp parallel for reduction(+ : nlistv, ndis) num_threads(num_omp_threads) for (int64_t i = 0; i < nx; i++) { const idx_t* keysi = keys + i * nprobe; HCounterState& csi = cs[i]; @@ -920,7 +921,7 @@ void IndexBinaryIVF::range_search_preassigned( std::vector all_pres(omp_get_max_threads()); -#pragma omp parallel reduction(+ : nlistv, ndis) +#pragma omp parallel reduction(+ : nlistv, ndis) num_threads(num_omp_threads) { RangeSearchPartialResult pres(res); std::unique_ptr scanner( diff --git a/faiss/IndexFastScan.cpp b/faiss/IndexFastScan.cpp index 4ed71aaee1..c59d2cc160 100644 --- a/faiss/IndexFastScan.cpp +++ b/faiss/IndexFastScan.cpp @@ -13,6 +13,7 @@ #include + #include #include #include @@ -348,7 +349,7 @@ void IndexFastScan::search_implem_234( } } -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { int64_t* heap_ids = labels + i * k; float* heap_dis = distances + i * k; diff --git a/faiss/IndexFlat.cpp b/faiss/IndexFlat.cpp index 0fa3b82062..ec46cc1552 100644 --- a/faiss/IndexFlat.cpp +++ b/faiss/IndexFlat.cpp @@ -227,7 +227,7 @@ void IndexFlat1D::search( perm.size() == ntotal, "Call update_permutation before search"); const float* xb = get_xb(); -#pragma omp parallel for if (n > 10000) +#pragma omp parallel for if (n > 10000) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { float q = x[i]; // query float* D = distances + i * k; diff --git a/faiss/IndexHNSW.cpp b/faiss/IndexHNSW.cpp index f133baf646..33ab4404a7 100644 --- a/faiss/IndexHNSW.cpp +++ b/faiss/IndexHNSW.cpp @@ -10,6 +10,7 @@ #include #include + #include #include #include @@ -188,7 +189,7 @@ void hnsw_add_vertices( bool interrupt = false; -#pragma omp parallel if (i1 > i0 + 100) +#pragma omp parallel if (i1 > i0 + 100) num_threads(num_omp_threads) { VisitedTable vt(ntotal); @@ -305,7 +306,7 @@ void IndexHNSW::search( for (idx_t i0 = 0; i0 < n; i0 += check_period) { idx_t i1 = std::min(i0 + check_period, n); -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { VisitedTable vt(ntotal); @@ -379,7 +380,7 @@ void IndexHNSW::reconstruct(idx_t key, float* recons) const { } void IndexHNSW::shrink_level_0_neighbors(int new_size) { -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { DistanceComputer* dis = storage_distance_computer(storage); ScopeDeleter1 del(dis); @@ -429,7 +430,7 @@ void IndexHNSW::search_level_0( storage_idx_t ntotal = hnsw.levels.size(); -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { std::unique_ptr qdis( storage_distance_computer(storage)); @@ -470,7 +471,7 @@ void IndexHNSW::init_level_0_from_knngraph( const idx_t* I) { int dest_size = hnsw.nb_neighbors(0); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t i = 0; i < ntotal; i++) { DistanceComputer* qdis = storage_distance_computer(storage); std::vector vec(d); @@ -511,7 +512,7 @@ void IndexHNSW::init_level_0_from_entry_points( for (int i = 0; i < ntotal; i++) omp_init_lock(&locks[i]); -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { VisitedTable vt(ntotal); @@ -546,7 +547,7 @@ void IndexHNSW::init_level_0_from_entry_points( void IndexHNSW::reorder_links() { int M = hnsw.nb_neighbors(0); -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { std::vector distances(M); std::vector order(M); @@ -745,7 +746,7 @@ void ReconstructFromNeighbors::reconstruct_n( storage_idx_t n0, storage_idx_t ni, float* x) const { -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { std::vector tmp(index.d); #pragma omp for @@ -844,7 +845,7 @@ void ReconstructFromNeighbors::add_codes(size_t n, const float* x) { return; } codes.resize(codes.size() + code_size * n); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < n; i++) { estimate_code( x + i * index.d, @@ -1027,7 +1028,7 @@ void IndexHNSW2Level::search( labels, false); -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { VisitedTable vt(ntotal); DistanceComputer* dis = storage_distance_computer(storage); diff --git a/faiss/IndexIDMap.cpp b/faiss/IndexIDMap.cpp index 6ce069da5e..148bfbb172 100644 --- a/faiss/IndexIDMap.cpp +++ b/faiss/IndexIDMap.cpp @@ -137,7 +137,7 @@ void IndexIDMapTemplate::range_search( FAISS_THROW_IF_NOT_MSG( !params, "search params not supported for this index"); index->range_search(n, x, radius, result); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t i = 0; i < result->lims[result->nq]; i++) { result->labels[i] = result->labels[i] < 0 ? result->labels[i] : id_map[result->labels[i]]; diff --git a/faiss/IndexIVF.cpp b/faiss/IndexIVF.cpp index 19e18e4666..f9f45d8e31 100644 --- a/faiss/IndexIVF.cpp +++ b/faiss/IndexIVF.cpp @@ -10,6 +10,7 @@ #include #include + #include #include @@ -237,7 +238,7 @@ void IndexIVF::add_core( DirectMapAdd dm_adder(direct_map, n, xids); -#pragma omp parallel reduction(+ : nadd) +#pragma omp parallel reduction(+ : nadd) num_threads(num_omp_threads) { int nt = omp_get_num_threads(); int rank = omp_get_thread_num(); @@ -346,7 +347,7 @@ void IndexIVF::search( std::mutex exception_mutex; std::string exception_string; -#pragma omp parallel for if (nt > 1) +#pragma omp parallel for if (nt > 1) num_threads(num_omp_threads) for (idx_t slice = 0; slice < nt; slice++) { IndexIVFStats local_stats; idx_t i0 = n * slice / nt; @@ -444,7 +445,7 @@ void IndexIVF::search_preassigned( : pmode == 1 ? nprobe > 1 : nprobe * n > 1); -#pragma omp parallel if (do_parallel) reduction(+ : nlistv, ndis, nheap) +#pragma omp parallel if (do_parallel) reduction(+ : nlistv, ndis, nheap) num_threads(num_omp_threads) { InvertedListScanner* scanner = get_InvertedListScanner(store_pairs, sel); @@ -781,7 +782,7 @@ void IndexIVF::range_search_preassigned( : pmode == 1 ? nprobe > 1 : nprobe * nx > 1); -#pragma omp parallel if (do_parallel) reduction(+ : nlistv, ndis) +#pragma omp parallel if (do_parallel) reduction(+ : nlistv, ndis) num_threads(num_omp_threads) { RangeSearchPartialResult pres(result); std::unique_ptr scanner( diff --git a/faiss/IndexIVFAdditiveQuantizer.cpp b/faiss/IndexIVFAdditiveQuantizer.cpp index 0fa836aa08..51b5a0b72b 100644 --- a/faiss/IndexIVFAdditiveQuantizer.cpp +++ b/faiss/IndexIVFAdditiveQuantizer.cpp @@ -77,7 +77,7 @@ void IndexIVFAdditiveQuantizer::encode_vectors( // subtract centroids std::vector residuals(n * d); -#pragma omp parallel for if (n > 10000) +#pragma omp parallel for if (n > 10000) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { quantizer->compute_residual( x + i * d, @@ -106,7 +106,7 @@ void IndexIVFAdditiveQuantizer::sa_decode( float* x) const { const size_t coarse_size = coarse_code_size(); -#pragma omp parallel if (n > 1000) +#pragma omp parallel if (n > 1000) num_threads(num_omp_threads) { std::vector residual(d); diff --git a/faiss/IndexIVFAdditiveQuantizerFastScan.cpp b/faiss/IndexIVFAdditiveQuantizerFastScan.cpp index e44e70c77c..3b97b8ce5d 100644 --- a/faiss/IndexIVFAdditiveQuantizerFastScan.cpp +++ b/faiss/IndexIVFAdditiveQuantizerFastScan.cpp @@ -13,6 +13,7 @@ #include + #include #include @@ -241,7 +242,7 @@ void IndexIVFAdditiveQuantizerFastScan::estimate_norm_scale( float scale = 0; -#pragma omp parallel for reduction(+ : scale) +#pragma omp parallel for reduction(+ : scale) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { const float* lut = dis_tables.get() + i * M * ksub; scale += quantize_lut::aq_estimate_norm_scale(M, ksub, 2, lut); @@ -283,7 +284,7 @@ void IndexIVFAdditiveQuantizerFastScan::encode_vectors( std::vector residuals(n * d); std::vector centroids(n * d); -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { if (list_nos[i] < 0) { memset(residuals.data() + i * d, 0, sizeof(residuals[0]) * d); @@ -293,7 +294,7 @@ void IndexIVFAdditiveQuantizerFastScan::encode_vectors( } } -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { auto c = centroids.data() + i * d; quantizer->reconstruct(list_nos[i], c); @@ -426,7 +427,7 @@ void IndexIVFAdditiveQuantizerFastScan::compute_LUT( // bias = coef * // NOTE: q^2 is not added to `biases` biases.resize(n * nprobe); -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { std::vector centroid(d); float* c = centroid.data(); @@ -457,7 +458,7 @@ void IndexIVFAdditiveQuantizerFastScan::compute_LUT( FAISS_THROW_IF_NOT(norm_tabs.size() == norm_dim12); // combine them -#pragma omp parallel for if (n > 100) +#pragma omp parallel for if (n > 100) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { float* tab = dis_tables.data() + i * dim12 + ip_dim12; memcpy(tab, norm_lut, norm_dim12 * sizeof(*tab)); diff --git a/faiss/IndexIVFFastScan.cpp b/faiss/IndexIVFFastScan.cpp index 701edc1fd7..0668b483e8 100644 --- a/faiss/IndexIVFFastScan.cpp +++ b/faiss/IndexIVFFastScan.cpp @@ -14,6 +14,7 @@ #include + #include #include @@ -267,7 +268,7 @@ void IndexIVFFastScan::compute_LUT_uint8( } uint64_t t1 = get_cy(); -#pragma omp parallel for if (n > 100) +#pragma omp parallel for if (n > 100) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { const float* t_in = dis_tables_float.get() + i * dim123; const float* b_in = nullptr; @@ -422,7 +423,7 @@ void IndexIVFFastScan::search_dispatch_implem( // many queries (for now we keep this simple) search_implem_14(n, x, k, distances, labels, impl, scaler); } else { -#pragma omp parallel for reduction(+ : ndis, nlist_visited) +#pragma omp parallel for reduction(+ : ndis, nlist_visited) num_threads(num_omp_threads) for (int slice = 0; slice < nslice; slice++) { idx_t i0 = n * slice / nslice; idx_t i1 = n * (slice + 1) / nslice; @@ -487,7 +488,7 @@ void IndexIVFFastScan::search_implem_1( size_t ndis = 0, nlist_visited = 0; -#pragma omp parallel for reduction(+ : ndis, nlist_visited) +#pragma omp parallel for reduction(+ : ndis, nlist_visited) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { int64_t* heap_ids = labels + i * k; float* heap_dis = distances + i * k; @@ -566,7 +567,7 @@ void IndexIVFFastScan::search_implem_2( size_t ndis = 0, nlist_visited = 0; -#pragma omp parallel for reduction(+ : ndis, nlist_visited) +#pragma omp parallel for reduction(+ : ndis, nlist_visited) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { std::vector tmp_dis(k); int64_t* heap_ids = labels + i * k; @@ -1074,7 +1075,7 @@ void IndexIVFFastScan::search_implem_14( size_t ndis = 0; size_t nlist_visited = 0; -#pragma omp parallel reduction(+ : ndis, nlist_visited) +#pragma omp parallel reduction(+ : ndis, nlist_visited) num_threads(num_omp_threads) { // storage for each thread std::vector local_idx(k * n); diff --git a/faiss/IndexIVFFlat.cpp b/faiss/IndexIVFFlat.cpp index 03f8cb0dc4..c4decd360e 100644 --- a/faiss/IndexIVFFlat.cpp +++ b/faiss/IndexIVFFlat.cpp @@ -11,6 +11,7 @@ #include + #include #include @@ -52,7 +53,7 @@ void IndexIVFFlat::add_core( DirectMapAdd dm_adder(direct_map, n, xids); -#pragma omp parallel reduction(+ : n_add) +#pragma omp parallel reduction(+ : n_add) num_threads(num_omp_threads) { int nt = omp_get_num_threads(); int rank = omp_get_thread_num(); @@ -278,7 +279,7 @@ void IndexIVFFlatDedup::add_with_ids( int64_t n_add = 0, n_dup = 0; -#pragma omp parallel reduction(+ : n_add, n_dup) +#pragma omp parallel reduction(+ : n_add, n_dup) num_threads(num_omp_threads) { int nt = omp_get_num_threads(); int rank = omp_get_thread_num(); @@ -423,7 +424,7 @@ size_t IndexIVFFlatDedup::remove_ids(const IDSelector& sel) { std::vector toremove(nlist); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = 0; i < nlist; i++) { int64_t l0 = invlists->list_size(i), l = l0, j = 0; InvertedLists::ScopedIds idsi(invlists, i); diff --git a/faiss/IndexIVFPQ.cpp b/faiss/IndexIVFPQ.cpp index defbf9d5ec..3fc5a1c347 100644 --- a/faiss/IndexIVFPQ.cpp +++ b/faiss/IndexIVFPQ.cpp @@ -238,7 +238,7 @@ void IndexIVFPQ::encode_vectors( void IndexIVFPQ::sa_decode(idx_t n, const uint8_t* codes, float* x) const { size_t coarse_size = coarse_code_size(); -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { std::vector residual(d); diff --git a/faiss/IndexIVFPQFastScan.cpp b/faiss/IndexIVFPQFastScan.cpp index 07d88bf50e..72115f3d6e 100644 --- a/faiss/IndexIVFPQFastScan.cpp +++ b/faiss/IndexIVFPQFastScan.cpp @@ -13,6 +13,7 @@ #include + #include #include @@ -252,7 +253,7 @@ void IndexIVFPQFastScan::compute_LUT( AlignedTable ip_table(n * dim12); pq.compute_inner_prod_tables(n, x, ip_table.get()); -#pragma omp parallel for if (n * nprobe > 8000) +#pragma omp parallel for if (n * nprobe > 8000) num_threads(num_omp_threads) for (idx_t ij = 0; ij < n * nprobe; ij++) { idx_t i = ij / nprobe; float* tab = dis_tables.get() + ij * dim12; @@ -277,7 +278,7 @@ void IndexIVFPQFastScan::compute_LUT( biases.resize(n * nprobe); memset(biases.get(), 0, sizeof(float) * n * nprobe); -#pragma omp parallel for if (n * nprobe > 8000) +#pragma omp parallel for if (n * nprobe > 8000) num_threads(num_omp_threads) for (idx_t ij = 0; ij < n * nprobe; ij++) { idx_t i = ij / nprobe; float* xij = &xrel[ij * d]; diff --git a/faiss/IndexIVFPQR.cpp b/faiss/IndexIVFPQR.cpp index f60302396d..b90cec81fd 100644 --- a/faiss/IndexIVFPQR.cpp +++ b/faiss/IndexIVFPQR.cpp @@ -128,7 +128,7 @@ void IndexIVFPQR::search_preassigned( // 3rd level refinement size_t n_refine = 0; -#pragma omp parallel reduction(+ : n_refine) +#pragma omp parallel reduction(+ : n_refine) num_threads(num_omp_threads) { // tmp buffers float* residual_1 = new float[2 * d]; diff --git a/faiss/IndexLattice.cpp b/faiss/IndexLattice.cpp index 4e0448e299..a6778bb06a 100644 --- a/faiss/IndexLattice.cpp +++ b/faiss/IndexLattice.cpp @@ -72,7 +72,7 @@ void IndexLattice::sa_encode(idx_t n, const float* x, uint8_t* codes) const { const float* maxs = mins + nsq; int64_t sc = int64_t(1) << scale_nbit; -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { BitstringWriter wr(codes + i * code_size, code_size); const float* xi = x + i * d; @@ -96,7 +96,7 @@ void IndexLattice::sa_decode(idx_t n, const uint8_t* codes, float* x) const { float sc = int64_t(1) << scale_nbit; float r = sqrtf(zn_sphere_codec.r2); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { BitstringReader rd(codes + i * code_size, code_size); float* xi = x + i * d; diff --git a/faiss/IndexNNDescent.cpp b/faiss/IndexNNDescent.cpp index 8cdc0c4ab1..1ead60b03f 100644 --- a/faiss/IndexNNDescent.cpp +++ b/faiss/IndexNNDescent.cpp @@ -11,6 +11,7 @@ #include + #include #include #include @@ -154,7 +155,7 @@ void IndexNNDescent::search( for (idx_t i0 = 0; i0 < n; i0 += check_period) { idx_t i1 = std::min(i0 + check_period, n); -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { VisitedTable vt(ntotal); diff --git a/faiss/IndexNSG.cpp b/faiss/IndexNSG.cpp index a7cfd490a4..3aee5e82b2 100644 --- a/faiss/IndexNSG.cpp +++ b/faiss/IndexNSG.cpp @@ -11,6 +11,7 @@ #include + #include #include @@ -91,7 +92,7 @@ void IndexNSG::search( for (idx_t i0 = 0; i0 < n; i0 += check_period) { idx_t i1 = std::min(i0 + check_period, n); -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { VisitedTable vt(ntotal); @@ -222,7 +223,7 @@ void IndexNSG::add(idx_t n, const float* x) { // cast from idx_t to int const int* knn_graph = index.nndescent.final_graph.data(); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t i = 0; i < ntotal * GK; i++) { knng[i] = knn_graph[i]; } @@ -260,7 +261,7 @@ void IndexNSG::reconstruct(idx_t key, float* recons) const { void IndexNSG::check_knn_graph(const idx_t* knn_graph, idx_t n, int K) const { idx_t total_count = 0; -#pragma omp parallel for reduction(+ : total_count) +#pragma omp parallel for reduction(+ : total_count) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { int count = 0; for (int j = 0; j < K; j++) { diff --git a/faiss/IndexPQ.cpp b/faiss/IndexPQ.cpp index 7df08899a4..a6366714a4 100644 --- a/faiss/IndexPQ.cpp +++ b/faiss/IndexPQ.cpp @@ -332,7 +332,7 @@ void IndexPQ::search_core_polysemous( if (false) { pq.compute_codes(x, q_codes, n); } else { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t qi = 0; qi < n; qi++) { pq.compute_code_from_distance_table( dis_tables + qi * pq.M * pq.ksub, @@ -344,7 +344,7 @@ void IndexPQ::search_core_polysemous( int bad_code_size = 0; -#pragma omp parallel for reduction(+ : n_pass, bad_code_size) +#pragma omp parallel for reduction(+ : n_pass, bad_code_size) num_threads(num_omp_threads) for (idx_t qi = 0; qi < n; qi++) { const uint8_t* q_code = q_codes + qi * pq.code_size; @@ -488,7 +488,7 @@ void IndexPQ::hamming_distance_histogram( memset(hist, 0, sizeof(*hist) * (nbits + 1)); size_t bs = 256; -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { std::vector histi(nbits + 1); hamdis_t* distances = new hamdis_t[nb * bs]; @@ -931,7 +931,7 @@ void MultiIndexQuantizer::search( if (k == 1) { // simple version that just finds the min in each table -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < n; i++) { const float* dis_table = dis_tables + i * pq.ksub * pq.M; float dis = 0; @@ -957,7 +957,7 @@ void MultiIndexQuantizer::search( } } else { -#pragma omp parallel if (n > 1) +#pragma omp parallel if (n > 1) num_threads(num_omp_threads) { MinSumK, false> msk( k, pq.M, pq.nbits, pq.ksub); @@ -1094,7 +1094,7 @@ void MultiIndexQuantizer2::search( } } else { -#pragma omp parallel if (n > 1) +#pragma omp parallel if (n > 1) num_threads(num_omp_threads) { MinSumK, false> msk( K, pq.M, pq.nbits, k2); diff --git a/faiss/IndexPQFastScan.cpp b/faiss/IndexPQFastScan.cpp index ba15e87429..03a1451e74 100644 --- a/faiss/IndexPQFastScan.cpp +++ b/faiss/IndexPQFastScan.cpp @@ -13,6 +13,7 @@ #include + #include #include #include diff --git a/faiss/IndexRefine.cpp b/faiss/IndexRefine.cpp index ccb6b61ca6..bb223dc97c 100644 --- a/faiss/IndexRefine.cpp +++ b/faiss/IndexRefine.cpp @@ -73,7 +73,7 @@ static void reorder_2_heaps( idx_t k_base, const idx_t* base_labels, const float* base_distances) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { idx_t* idxo = labels + i * k; float* diso = distances + i * k; @@ -120,7 +120,7 @@ void IndexRefine::search( assert(base_labels[i] >= -1 && base_labels[i] < ntotal); // parallelize over queries -#pragma omp parallel if (n > 1) +#pragma omp parallel if (n > 1) num_threads(num_omp_threads) { std::unique_ptr dc( refine_index->get_distance_computer()); diff --git a/faiss/IndexScalarQuantizer.cpp b/faiss/IndexScalarQuantizer.cpp index 94268af688..e22639703a 100644 --- a/faiss/IndexScalarQuantizer.cpp +++ b/faiss/IndexScalarQuantizer.cpp @@ -14,6 +14,7 @@ #include + #include #include #include @@ -62,7 +63,7 @@ void IndexScalarQuantizer::search( // of queries in the batch. If n = 1, then the search is done in a single thread. // This is done to avoid the overhead of spawning threads for executing sequential code. // This is for bleve, more in: MB-61930 -#pragma omp parallel if (n > 1) +#pragma omp parallel if (n > 1) num_threads(num_omp_threads) { InvertedListScanner* scanner = sq.select_InvertedListScanner(metric_type, nullptr, true, sel); @@ -153,7 +154,7 @@ void IndexIVFScalarQuantizer::encode_vectors( size_t coarse_size = include_listnos ? coarse_code_size() : 0; memset(codes, 0, (code_size + coarse_size) * n); -#pragma omp parallel if (n > 1000) +#pragma omp parallel if (n > 1000) num_threads(num_omp_threads) { std::vector residual(d); @@ -181,7 +182,7 @@ void IndexIVFScalarQuantizer::sa_decode(idx_t n, const uint8_t* codes, float* x) std::unique_ptr squant(sq.select_quantizer()); size_t coarse_size = coarse_code_size(); -#pragma omp parallel if (n > 1000) +#pragma omp parallel if (n > 1000) num_threads(num_omp_threads) { std::vector residual(d); @@ -213,7 +214,7 @@ void IndexIVFScalarQuantizer::add_core( DirectMapAdd dm_add(direct_map, n, xids); -#pragma omp parallel reduction(+ : nadd) +#pragma omp parallel reduction(+ : nadd) num_threads(num_omp_threads) { std::vector residual(d); std::vector one_code(code_size); diff --git a/faiss/MetaIndexes.cpp b/faiss/MetaIndexes.cpp index 50c7c88cb8..6a4bdae2d0 100644 --- a/faiss/MetaIndexes.cpp +++ b/faiss/MetaIndexes.cpp @@ -8,6 +8,7 @@ // -*- c++ -*- #include +#include #include #include @@ -184,7 +185,7 @@ void IndexRandom::search( FAISS_THROW_IF_NOT_MSG( !params, "search params not supported for this index"); FAISS_THROW_IF_NOT(k <= ntotal); -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { RandomGenerator rng( seed + ivec_checksum(d, (const int32_t*)(x + i * d))); diff --git a/faiss/OMPConfig.cpp b/faiss/OMPConfig.cpp new file mode 100644 index 0000000000..e33f41634b --- /dev/null +++ b/faiss/OMPConfig.cpp @@ -0,0 +1,20 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +// -*- c++ -*- + + +#include + +namespace faiss { + + unsigned int num_omp_threads = std::thread::hardware_concurrency(); + + void set_num_omp_threads(unsigned int value) { + num_omp_threads = value; + } +} diff --git a/faiss/OMPConfig.h b/faiss/OMPConfig.h new file mode 100644 index 0000000000..d943480ce9 --- /dev/null +++ b/faiss/OMPConfig.h @@ -0,0 +1,51 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +// -*- c++ -*- + +#ifndef OMPCONFIG_H +#define OMPCONFIG_H + +#include + +namespace faiss { + // Degree of OpenMP concurrency, determining the number of threads + // used in OpenMP parallel regions. + // + // This variable specifies the number of threads to be employed in + // parallel regions defined by OpenMP directives. If not set explicitly, + // the default value is the number of hardware threads available on + // the system. This allows for efficient utilization of available + // processing resources. + // + // The value of num_omp_threads can be modified to control the level + // of parallelism and optimize performance based on the specific + // requirements and characteristics of the workload. + // + // Usage: + // - Setting num_omp_threads to a specific value before entering an + // OpenMP parallel region will limit or expand the number of threads + // accordingly. + // - If num_omp_threads is not set, OpenMP will automatically use the + // number of hardware threads as the default value. + // + // Example: + // unsigned int num_omp_threads = 4; // Use 4 threads in OpenMP parallel regions. + extern unsigned int num_omp_threads; + + // Setter function for num_omp_threads. + // + // Parameters: + // - value: The desired number of threads to be used in OpenMP parallel regions. + // + // Usage: + // set_num_omp_threads(8); // would set to use 8 threads in OpenMP parallel regions. + // + // p.s. to be invoked on process init only. + void set_num_omp_threads(unsigned int value); +} +#endif // OMPCONFIG_H diff --git a/faiss/gpu/impl/InterleavedCodes.cpp b/faiss/gpu/impl/InterleavedCodes.cpp index 801ff72180..dbfb0d8902 100644 --- a/faiss/gpu/impl/InterleavedCodes.cpp +++ b/faiss/gpu/impl/InterleavedCodes.cpp @@ -107,7 +107,7 @@ std::vector unpackNonInterleaved( std::vector out(numVecs * dims * utils::divUp(bitsPerCode, 8)); if (bitsPerCode == 4) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numVecs; ++i) { for (int j = 0; j < dims; ++j) { int srcIdx = i * srcVecSize + (j / 2); @@ -120,7 +120,7 @@ std::vector unpackNonInterleaved( } } } else if (bitsPerCode == 5) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numVecs; ++i) { for (int j = 0; j < dims; ++j) { int lo = i * srcVecSize + (j * 5) / 8; @@ -136,7 +136,7 @@ std::vector unpackNonInterleaved( } } } else if (bitsPerCode == 6) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numVecs; ++i) { for (int j = 0; j < dims; ++j) { int lo = i * srcVecSize + (j * 6) / 8; @@ -170,7 +170,7 @@ void unpackInterleavedWord( int wordsPerBlock = wordsPerDimBlock * dims; int numBlocks = utils::divUp(numVecs, 32); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numVecs; ++i) { int block = i / 32; FAISS_ASSERT(block < numBlocks); @@ -215,7 +215,7 @@ std::vector unpackInterleaved( dims, bitsPerCode); } else if (bitsPerCode == 4) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numVecs; ++i) { int block = i / 32; int lane = i % 32; @@ -233,7 +233,7 @@ std::vector unpackInterleaved( } } } else if (bitsPerCode == 5) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numVecs; ++i) { int block = i / 32; int blockVector = i % 32; @@ -255,7 +255,7 @@ std::vector unpackInterleaved( } } } else if (bitsPerCode == 6) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numVecs; ++i) { int block = i / 32; int blockVector = i % 32; @@ -380,7 +380,7 @@ std::vector packNonInterleaved( std::vector out(numVecs * bytesPerVec); if (bitsPerCode == 4) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numVecs; ++i) { for (int j = 0; j < bytesPerVec; ++j) { int dimLo = j * 2; @@ -395,7 +395,7 @@ std::vector packNonInterleaved( } } } else if (bitsPerCode == 5) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numVecs; ++i) { for (int j = 0; j < bytesPerVec; ++j) { int dimLo = (j * 8) / 5; @@ -413,7 +413,7 @@ std::vector packNonInterleaved( } } } else if (bitsPerCode == 6) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numVecs; ++i) { for (int j = 0; j < bytesPerVec; ++j) { int dimLo = (j * 8) / 6; @@ -448,7 +448,7 @@ void packInterleavedWord( // We're guaranteed that all other slots not filled by the vectors present // are initialized to zero (from the vector constructor in packInterleaved) -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numVecs; ++i) { int block = i / 32; FAISS_ASSERT(block < numBlocks); @@ -495,7 +495,7 @@ std::vector packInterleaved( dims, bitsPerCode); } else if (bitsPerCode == 4) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numBlocks; ++i) { for (int j = 0; j < dims; ++j) { for (int k = 0; k < bytesPerDimBlock; ++k) { @@ -511,7 +511,7 @@ std::vector packInterleaved( } } } else if (bitsPerCode == 5) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numBlocks; ++i) { for (int j = 0; j < dims; ++j) { for (int k = 0; k < bytesPerDimBlock; ++k) { @@ -531,7 +531,7 @@ std::vector packInterleaved( } } } else if (bitsPerCode == 6) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < numBlocks; ++i) { for (int j = 0; j < dims; ++j) { for (int k = 0; k < bytesPerDimBlock; ++k) { diff --git a/faiss/gpu/impl/RemapIndices.cpp b/faiss/gpu/impl/RemapIndices.cpp index 49d969a21f..88ef90ed11 100644 --- a/faiss/gpu/impl/RemapIndices.cpp +++ b/faiss/gpu/impl/RemapIndices.cpp @@ -21,7 +21,7 @@ void ivfOffsetToUserIndex( const std::vector>& listOffsetToUserIndex) { FAISS_ASSERT(numLists == listOffsetToUserIndex.size()); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t q = 0; q < queries; ++q) { for (idx_t r = 0; r < k; ++r) { auto offsetIndex = indices[q * k + r]; diff --git a/faiss/impl/AdditiveQuantizer.cpp b/faiss/impl/AdditiveQuantizer.cpp index ff6eb4a98a..747fba3e31 100644 --- a/faiss/impl/AdditiveQuantizer.cpp +++ b/faiss/impl/AdditiveQuantizer.cpp @@ -243,7 +243,7 @@ void AdditiveQuantizer::pack_codes( norms = norm_buf.data(); } } -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { const int32_t* codes1 = codes + i * ld_codes; BitstringWriter bsw(packed_codes + i * code_size, code_size); @@ -261,7 +261,7 @@ void AdditiveQuantizer::decode(const uint8_t* code, float* x, size_t n) const { is_trained, "The additive quantizer is not trained yet."); // standard additive quantizer decoding -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { BitstringReader bsr(code + i * code_size, code_size); float* xi = x + i * d; @@ -290,7 +290,7 @@ void AdditiveQuantizer::decode_unpacked( } // standard additive quantizer decoding -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { const int32_t* codesi = code + i * ld_codes; float* xi = x + i * d; @@ -315,7 +315,7 @@ AdditiveQuantizer::~AdditiveQuantizer() {} void AdditiveQuantizer::compute_centroid_norms(float* norms) const { size_t ntotal = (size_t)1 << tot_bits; // TODO: make tree of partial sums -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { std::vector tmp(d); #pragma omp for @@ -404,7 +404,7 @@ void AdditiveQuantizer::knn_centroids_inner_product( compute_LUT(n, xq, LUT.get()); size_t ntotal = (size_t)1 << tot_bits; -#pragma omp parallel if (n > 100) +#pragma omp parallel if (n > 100) num_threads(num_omp_threads) { std::vector dis(ntotal); #pragma omp for @@ -433,7 +433,7 @@ void AdditiveQuantizer::knn_centroids_L2( fvec_norms_L2sqr(q_norms.get(), xq, d, n); size_t ntotal = (size_t)1 << tot_bits; -#pragma omp parallel if (n > 100) +#pragma omp parallel if (n > 100) num_threads(num_omp_threads) { std::vector dis(ntotal); #pragma omp for diff --git a/faiss/impl/HNSW.cpp b/faiss/impl/HNSW.cpp index b356da9673..13c92c5936 100644 --- a/faiss/impl/HNSW.cpp +++ b/faiss/impl/HNSW.cpp @@ -102,7 +102,7 @@ void HNSW::print_neighbor_stats(int level) const { nb_neighbors(level)); size_t tot_neigh = 0, tot_common = 0, tot_reciprocal = 0, n_node = 0; #pragma omp parallel for reduction(+: tot_neigh) reduction(+: tot_common) \ - reduction(+: tot_reciprocal) reduction(+: n_node) + reduction(+: tot_reciprocal) reduction(+: n_node) num_threads(num_omp_threads) for (int i = 0; i < levels.size(); i++) { if (levels[i] > level) { n_node++; diff --git a/faiss/impl/HNSW.h b/faiss/impl/HNSW.h index 6e2524ec5c..53cda75664 100644 --- a/faiss/impl/HNSW.h +++ b/faiss/impl/HNSW.h @@ -15,6 +15,7 @@ #include + #include #include #include diff --git a/faiss/impl/LocalSearchQuantizer.cpp b/faiss/impl/LocalSearchQuantizer.cpp index abbfe74901..275bf49159 100644 --- a/faiss/impl/LocalSearchQuantizer.cpp +++ b/faiss/impl/LocalSearchQuantizer.cpp @@ -193,7 +193,7 @@ void LocalSearchQuantizer::train(size_t n, const float* x) { // compute standard derivations of each dimension std::vector stddev(d, 0); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = 0; i < d; i++) { float mean = 0; for (size_t j = 0; j < n; j++) { @@ -563,7 +563,7 @@ void LocalSearchQuantizer::icm_encode_impl( float mean_obj = 0.0f; // select the best code for every vector xi -#pragma omp parallel for reduction(+ : n_betters, mean_obj) +#pragma omp parallel for reduction(+ : n_betters, mean_obj) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { if (icm_objs[i] < best_objs[i]) { best_objs[i] = icm_objs[i]; @@ -597,7 +597,7 @@ void LocalSearchQuantizer::icm_encode_step( FAISS_THROW_IF_NOT(M != 0 && K != 0); FAISS_THROW_IF_NOT(binaries != nullptr); -#pragma omp parallel for schedule(dynamic) +#pragma omp parallel for schedule(dynamic) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { std::vector objs(K); @@ -683,7 +683,7 @@ void LocalSearchQuantizer::perturb_codes( void LocalSearchQuantizer::compute_binary_terms(float* binaries) const { LSQTimerScope scope(&lsq_timer, "compute_binary_terms"); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t m12 = 0; m12 < M * M; m12++) { size_t m1 = m12 / M; size_t m2 = m12 % M; @@ -739,7 +739,7 @@ void LocalSearchQuantizer::compute_unary_terms( std::vector norms(M * K); fvec_norms_L2sqr(norms.data(), codebooks.data(), d, M * K); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { for (size_t m = 0; m < M; m++) { float* u = unaries + m * n * K + i * K; @@ -759,7 +759,7 @@ float LocalSearchQuantizer::evaluate( std::vector decoded_x(n * d, 0.0f); float obj = 0.0f; -#pragma omp parallel for reduction(+ : obj) +#pragma omp parallel for reduction(+ : obj) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { const auto code = codes + i * M; const auto decoded_i = decoded_x.data() + i * d; diff --git a/faiss/impl/NNDescent.cpp b/faiss/impl/NNDescent.cpp index cc84b5d609..13828486d8 100644 --- a/faiss/impl/NNDescent.cpp +++ b/faiss/impl/NNDescent.cpp @@ -154,7 +154,7 @@ NNDescent::NNDescent(const int d, const int K) : K(K), d(d) { NNDescent::~NNDescent() {} void NNDescent::join(DistanceComputer& qdis) { -#pragma omp parallel for default(shared) schedule(dynamic, 100) +#pragma omp parallel for default(shared) schedule(dynamic, 100) num_threads(num_omp_threads) for (int n = 0; n < ntotal; n++) { graph[n].join([&](int i, int j) { if (i != j) { @@ -171,7 +171,7 @@ void NNDescent::join(DistanceComputer& qdis) { void NNDescent::update() { // Step 1. // Clear all nn_new and nn_old -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < ntotal; i++) { std::vector().swap(graph[i].nn_new); std::vector().swap(graph[i].nn_old); @@ -181,7 +181,7 @@ void NNDescent::update() { // Compute the number of neighbors which is new i.e. flag is true // in the candidate pool. This must not exceed the sample number S. // That means We only select S new neighbors. -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int n = 0; n < ntotal; ++n) { auto& nn = graph[n]; std::sort(nn.pool.begin(), nn.pool.end()); @@ -205,7 +205,7 @@ void NNDescent::update() { // Step 3. // Find reverse links for each node // Randomly choose R reverse links. -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { std::mt19937 rng(random_seed * 5081 + omp_get_thread_num()); #pragma omp for @@ -258,7 +258,7 @@ void NNDescent::update() { // Step 4. // Combine the forward and the reverse links // R = 0 means no reverse links are used. -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < ntotal; ++i) { auto& nn_new = graph[i].nn_new; auto& nn_old = graph[i].nn_old; @@ -301,7 +301,7 @@ void NNDescent::generate_eval_set( std::vector& c, std::vector>& v, int N) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < c.size(); i++) { std::vector tmp; for (int j = 0; j < N; j++) { @@ -349,7 +349,7 @@ void NNDescent::init_graph(DistanceComputer& qdis) { graph.push_back(Nhood(L, S, rng, (int)ntotal)); } } -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { std::mt19937 rng(random_seed * 7741 + omp_get_thread_num()); #pragma omp for diff --git a/faiss/impl/NNDescent.h b/faiss/impl/NNDescent.h index 2426b0d7bb..04dd5fea02 100644 --- a/faiss/impl/NNDescent.h +++ b/faiss/impl/NNDescent.h @@ -18,6 +18,7 @@ #include + #include #include #include diff --git a/faiss/impl/NSG.cpp b/faiss/impl/NSG.cpp index 1f30b576b9..15e75ad7a4 100644 --- a/faiss/impl/NSG.cpp +++ b/faiss/impl/NSG.cpp @@ -185,7 +185,7 @@ void NSG::build( final_graph = std::make_shared>(n, R); std::fill_n(final_graph->data, n * R, EMPTY_ID); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < n; i++) { int cnt = 0; for (int j = 0; j < R; j++) { @@ -357,7 +357,7 @@ void NSG::link( const nsg::Graph& knn_graph, nsg::Graph& graph, bool /* verbose */) { -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { std::unique_ptr vec(new float[storage->d]); @@ -386,7 +386,7 @@ void NSG::link( } // omp parallel std::vector locks(ntotal); -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { std::unique_ptr dis( storage_distance_computer(storage)); @@ -662,7 +662,7 @@ int NSG::attach_unlinked( } void NSG::check_graph() const { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < ntotal; i++) { for (int j = 0; j < R; j++) { int id = final_graph->at(i, j); diff --git a/faiss/impl/NSG.h b/faiss/impl/NSG.h index e115b317fb..5a97d12fdc 100644 --- a/faiss/impl/NSG.h +++ b/faiss/impl/NSG.h @@ -15,6 +15,7 @@ #include + #include #include #include diff --git a/faiss/impl/PolysemousTraining.cpp b/faiss/impl/PolysemousTraining.cpp index e524724113..3bcb54a786 100644 --- a/faiss/impl/PolysemousTraining.cpp +++ b/faiss/impl/PolysemousTraining.cpp @@ -10,6 +10,7 @@ #include #include + #include #include @@ -842,7 +843,7 @@ void PolysemousTraining::optimize_ranking( pq.compute_sdc_table(); } -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int m = 0; m < pq.M; m++) { size_t nq, nb; std::vector codes; // query codes, then db codes diff --git a/faiss/impl/ProductAdditiveQuantizer.cpp b/faiss/impl/ProductAdditiveQuantizer.cpp index 1104b778a4..7ce1475b83 100644 --- a/faiss/impl/ProductAdditiveQuantizer.cpp +++ b/faiss/impl/ProductAdditiveQuantizer.cpp @@ -99,7 +99,7 @@ void ProductAdditiveQuantizer::train(size_t n, const float* x) { auto q = quantizers[s]; xt.resize(q->d * n); -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { memcpy(xt.data() + i * q->d, x + i * d + offset_d, @@ -166,7 +166,7 @@ void ProductAdditiveQuantizer::compute_unpacked_codes( xsub.resize(n * q->d); codes.resize(n * q->code_size); -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { memcpy(xsub.data() + i * q->d, x + i * d + offset_d, @@ -176,7 +176,7 @@ void ProductAdditiveQuantizer::compute_unpacked_codes( q->compute_codes(xsub.data(), codes.data(), n); // unpack -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (idx_t i = 0; i < n; i++) { uint8_t* code = codes.data() + i * q->code_size; BitstringReader bsr(code, q->code_size); @@ -205,7 +205,7 @@ void ProductAdditiveQuantizer::decode_unpacked( } // product additive quantizer decoding -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { const int32_t* codesi = codes + i * ld_codes; @@ -236,7 +236,7 @@ void ProductAdditiveQuantizer::decode(const uint8_t* codes, float* x, size_t n) FAISS_THROW_IF_NOT_MSG( is_trained, "The product additive quantizer is not trained yet."); -#pragma omp parallel for if (n > 1000) +#pragma omp parallel for if (n > 1000) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { BitstringReader bsr(codes + i * code_size, code_size); diff --git a/faiss/impl/ProductQuantizer.cpp b/faiss/impl/ProductQuantizer.cpp index 822cd90d72..9760825272 100644 --- a/faiss/impl/ProductQuantizer.cpp +++ b/faiss/impl/ProductQuantizer.cpp @@ -401,7 +401,7 @@ void ProductQuantizer::compute_codes(const float* x, uint8_t* codes, size_t n) if (dsub < 16) { // simple direct computation -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) compute_code(x + i * d, codes + i * code_size); @@ -410,7 +410,7 @@ void ProductQuantizer::compute_codes(const float* x, uint8_t* codes, size_t n) ScopeDeleter del(dis_tables); compute_distance_tables(n, x, dis_tables); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { uint8_t* code = codes + i * code_size; const float* tab = dis_tables + i * ksub * M; @@ -473,7 +473,7 @@ void ProductQuantizer::compute_distance_tables( #endif if (dsub < 16) { -#pragma omp parallel for if (nx > 1) +#pragma omp parallel for if (nx > 1) num_threads(num_omp_threads) for (int64_t i = 0; i < nx; i++) { compute_distance_table(x + i * d, dis_tables + i * ksub * M); } @@ -507,7 +507,7 @@ void ProductQuantizer::compute_inner_prod_tables( #endif if (dsub < 16) { -#pragma omp parallel for if (nx > 1) +#pragma omp parallel for if (nx > 1) num_threads(num_omp_threads) for (int64_t i = 0; i < nx; i++) { compute_inner_prod_table(x + i * d, dis_tables + i * ksub * M); } @@ -681,7 +681,7 @@ void pq_knn_search_with_tables( size_t k = res->k, nx = res->nh; size_t ksub = pq.ksub, M = pq.M; -#pragma omp parallel for if (nx > 1) +#pragma omp parallel for if (nx > 1) num_threads(num_omp_threads) for (int64_t i = 0; i < nx; i++) { /* query preparation for asymmetric search: compute look-up tables */ const float* dis_table = dis_tables + i * ksub * M; @@ -782,7 +782,7 @@ void ProductQuantizer::compute_sdc_table() { sdc_table.resize(M * ksub * ksub); if (dsub < 4) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int mk = 0; mk < M * ksub; mk++) { // allow omp to schedule in a more fine-grained way // `collapse` is not supported in OpenMP 2.x @@ -796,7 +796,7 @@ void ProductQuantizer::compute_sdc_table() { } else { // NOTE: it would disable the omp loop in pairwise_L2sqr // but still accelerate especially when M >= 4 -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int m = 0; m < M; m++) { const float* cents = centroids.data() + m * ksub * dsub; float* dis_tab = sdc_table.data() + m * ksub * ksub; @@ -817,7 +817,7 @@ void ProductQuantizer::search_sdc( FAISS_THROW_IF_NOT(nbits == 8); size_t k = res->k; -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = 0; i < nq; i++) { /* Compute distances and keep smallest values */ idx_t* heap_ids = res->ids + i * k; diff --git a/faiss/impl/ResidualQuantizer.cpp b/faiss/impl/ResidualQuantizer.cpp index fa8248a60a..06c301e554 100644 --- a/faiss/impl/ResidualQuantizer.cpp +++ b/faiss/impl/ResidualQuantizer.cpp @@ -174,7 +174,7 @@ void beam_search_encode_step( } InterruptCallback::check(); -#pragma omp parallel for if (n > 100) +#pragma omp parallel for if (n > 100) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { const int32_t* codes_i = codes + i * m * beam_size; int32_t* new_codes_i = new_codes + i * (m + 1) * new_beam_size; @@ -1187,7 +1187,7 @@ void beam_search_encode_step_tab( { FAISS_THROW_IF_NOT(ldc >= K); -#pragma omp parallel for if (n > 100) schedule(dynamic) +#pragma omp parallel for if (n > 100) schedule(dynamic) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { std::vector cent_distances(beam_size * K); std::vector cd_common(K); diff --git a/faiss/impl/ResultHandler.h b/faiss/impl/ResultHandler.h index d096fbcfa3..a15230d524 100644 --- a/faiss/impl/ResultHandler.h +++ b/faiss/impl/ResultHandler.h @@ -92,7 +92,7 @@ struct HeapResultHandler { /// add results for query i0..i1 and j0..j1 void add_results(size_t j0, size_t j1, const T* dis_tab) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = i0; i < i1; i++) { T* heap_dis = heap_dis_tab + i * k; TI* heap_ids = heap_ids_tab + i * k; @@ -282,7 +282,7 @@ struct ReservoirResultHandler { /// add results for query i0..i1 and j0..j1 void add_results(size_t j0, size_t j1, const T* dis_tab) { // maybe parallel for -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = i0; i < i1; i++) { ReservoirTopN& reservoir = reservoirs[i - i0]; const T* dis_tab_i = dis_tab + (j1 - j0) * (i - i0) - j0; diff --git a/faiss/impl/ScalarQuantizer.cpp b/faiss/impl/ScalarQuantizer.cpp index a3bf7f0c8a..8beee9f56c 100644 --- a/faiss/impl/ScalarQuantizer.cpp +++ b/faiss/impl/ScalarQuantizer.cpp @@ -15,6 +15,7 @@ #include #include + #ifdef __SSE__ #include #endif @@ -605,7 +606,7 @@ void train_NonUniform( // Add an openMP guard here to prevent spawning threads // when d = 1 (which would indicate sequential execution). // This is for bleve, more in MB-61930. -#pragma omp parallel for if (d > 1) +#pragma omp parallel for if (d > 1) num_threads(num_omp_threads) for (int j = 0; j < d; j++) { train_Uniform(rs, rs_arg, n, k, xt.data() + j * n, trained_d); vmin[j] = trained_d[0]; @@ -1165,7 +1166,7 @@ void ScalarQuantizer::compute_codes(const float* x, uint8_t* codes, size_t n) // Add an openMP guard here to prevent spawning threads // when n = 1 (which would indicate sequential execution). // This is for bleve, more in MB-61930. -#pragma omp parallel for if (n > 1) +#pragma omp parallel for if (n > 1) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) squant->encode_vector(x + i * d, codes + i * code_size); } @@ -1176,7 +1177,7 @@ void ScalarQuantizer::decode(const uint8_t* codes, float* x, size_t n) const { // Add an openMP guard here to prevent spawning threads // when n = 1 (which would indicate sequential execution). // This is for bleve, more in MB-61930. -#pragma omp parallel for if (n > 1) +#pragma omp parallel for if (n > 1) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) squant->decode_vector(codes + i * code_size, x + i * d); } diff --git a/faiss/impl/lattice_Zn.cpp b/faiss/impl/lattice_Zn.cpp index 9c8ec4fca2..12d890a633 100644 --- a/faiss/impl/lattice_Zn.cpp +++ b/faiss/impl/lattice_Zn.cpp @@ -265,7 +265,7 @@ void Repeats::decode(uint64_t code, float* c) const { void EnumeratedVectors::encode_multi(size_t n, const float* c, uint64_t* codes) const { -#pragma omp parallel if (n > 1000) +#pragma omp parallel if (n > 1000) num_threads(num_omp_threads) { #pragma omp for for (int i = 0; i < n; i++) { @@ -276,7 +276,7 @@ void EnumeratedVectors::encode_multi(size_t n, const float* c, uint64_t* codes) void EnumeratedVectors::decode_multi(size_t n, const uint64_t* codes, float* c) const { -#pragma omp parallel if (n > 1000) +#pragma omp parallel if (n > 1000) num_threads(num_omp_threads) { #pragma omp for for (int i = 0; i < n; i++) { @@ -374,7 +374,7 @@ void ZnSphereSearch::search_multi( const float* x, float* c_out, float* dp_out) { -#pragma omp parallel if (n > 1000) +#pragma omp parallel if (n > 1000) num_threads(num_omp_threads) { #pragma omp for for (int i = 0; i < n; i++) { diff --git a/faiss/impl/platform_macros.h b/faiss/impl/platform_macros.h index 205b47f2cd..40d1e94a72 100644 --- a/faiss/impl/platform_macros.h +++ b/faiss/impl/platform_macros.h @@ -10,6 +10,7 @@ // basic int types and size_t #include #include +#include #ifdef _MSC_VER diff --git a/faiss/invlists/DirectMap.cpp b/faiss/invlists/DirectMap.cpp index b276b764ec..ff2ebee76f 100644 --- a/faiss/invlists/DirectMap.cpp +++ b/faiss/invlists/DirectMap.cpp @@ -151,7 +151,7 @@ size_t DirectMap::remove_ids(const IDSelector& sel, InvertedLists* invlists) { if (type == NoMap) { // exhaustive scan of IVF -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t i = 0; i < nlist; i++) { idx_t l0 = invlists->list_size(i), l = l0, j = 0; ScopedIds idsi(invlists, i); diff --git a/faiss/invlists/InvertedLists.cpp b/faiss/invlists/InvertedLists.cpp index 26adbf4ed7..2172bd7449 100644 --- a/faiss/invlists/InvertedLists.cpp +++ b/faiss/invlists/InvertedLists.cpp @@ -81,7 +81,7 @@ InvertedListsIterator* InvertedLists::get_iterator(size_t /*list_no*/) const { } void InvertedLists::merge_from(InvertedLists* oivf, size_t add_id) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (idx_t i = 0; i < nlist; i++) { size_t list_size = oivf->list_size(i); ScopedIds ids(oivf, i); diff --git a/faiss/invlists/OnDiskInvertedLists.cpp b/faiss/invlists/OnDiskInvertedLists.cpp index 2896190447..17665999a0 100644 --- a/faiss/invlists/OnDiskInvertedLists.cpp +++ b/faiss/invlists/OnDiskInvertedLists.cpp @@ -611,7 +611,7 @@ size_t OnDiskInvertedLists::merge_from( size_t nmerged = 0; double t0 = getmillisecs(), last_t = t0; -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (size_t j = 0; j < nlist; j++) { List& l = lists[j]; for (int i = 0; i < n_il; i++) { diff --git a/faiss/utils/Heap.cpp b/faiss/utils/Heap.cpp index 63981ba6dc..f8d859f6dc 100644 --- a/faiss/utils/Heap.cpp +++ b/faiss/utils/Heap.cpp @@ -16,14 +16,14 @@ namespace faiss { template void HeapArray::heapify() { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t j = 0; j < nh; j++) heap_heapify(k, val + j * k, ids + j * k); } template void HeapArray::reorder() { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t j = 0; j < nh; j++) heap_reorder(k, val + j * k, ids + j * k); } @@ -33,7 +33,7 @@ void HeapArray::addn(size_t nj, const T* vin, TI j0, size_t i0, int64_t ni) { if (ni == -1) ni = nh; assert(i0 >= 0 && i0 + ni <= nh); -#pragma omp parallel for if (ni * nj > 100000) +#pragma omp parallel for if (ni * nj > 100000) num_threads(num_omp_threads) for (int64_t i = i0; i < i0 + ni; i++) { T* __restrict simi = get_val(i); TI* __restrict idxi = get_ids(i); @@ -63,7 +63,7 @@ void HeapArray::addn_with_ids( if (ni == -1) ni = nh; assert(i0 >= 0 && i0 + ni <= nh); -#pragma omp parallel for if (ni * nj > 100000) +#pragma omp parallel for if (ni * nj > 100000) num_threads(num_omp_threads) for (int64_t i = i0; i < i0 + ni; i++) { T* __restrict simi = get_val(i); TI* __restrict idxi = get_ids(i); @@ -91,7 +91,7 @@ void HeapArray::addn_query_subset_with_ids( if (id_stride < 0) { id_stride = nj; } -#pragma omp parallel for if (nsubset * nj > 100000) +#pragma omp parallel for if (nsubset * nj > 100000) num_threads(num_omp_threads) for (int64_t si = 0; si < nsubset; si++) { T i = subset[si]; T* __restrict simi = get_val(i); @@ -110,7 +110,7 @@ void HeapArray::addn_query_subset_with_ids( template void HeapArray::per_line_extrema(T* out_val, TI* out_ids) const { -#pragma omp parallel for if (nh * k > 100000) +#pragma omp parallel for if (nh * k > 100000) num_threads(num_omp_threads) for (int64_t j = 0; j < nh; j++) { int64_t imin = -1; typename C::T xval = C::Crev::neutral(); @@ -167,7 +167,7 @@ void merge_knn_results( return; } long stride = n * k; -#pragma omp parallel if (n * nshard * k > 100000) +#pragma omp parallel if (n * nshard * k > 100000) num_threads(num_omp_threads) { std::vector buf(2 * nshard); // index in each shard's result list diff --git a/faiss/utils/distances.cpp b/faiss/utils/distances.cpp index 5e025f93d6..08cbd9aeef 100644 --- a/faiss/utils/distances.cpp +++ b/faiss/utils/distances.cpp @@ -17,6 +17,7 @@ #include + #ifdef __AVX2__ #include #endif @@ -64,7 +65,7 @@ void fvec_norms_L2( const float* __restrict x, size_t d, size_t nx) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = 0; i < nx; i++) { nr[i] = sqrtf(fvec_norm_L2sqr(x + i * d, d)); } @@ -75,13 +76,13 @@ void fvec_norms_L2sqr( const float* __restrict x, size_t d, size_t nx) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = 0; i < nx; i++) nr[i] = fvec_norm_L2sqr(x + i * d, d); } void fvec_renorm_L2(size_t d, size_t nx, float* __restrict x) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = 0; i < nx; i++) { float* __restrict xi = x + i * d; @@ -289,7 +290,7 @@ void exhaustive_L2sqr_blas_default_impl( ip_block.get(), &nyi); } -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = i0; i < i1; i++) { float* ip_line = ip_block.get() + (i - i0) * (j1 - j0); @@ -384,7 +385,7 @@ void exhaustive_L2sqr_blas_cmax_avx2( ip_block.get(), &nyi); } -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = i0; i < i1; i++) { float* ip_line = ip_block.get() + (i - i0) * (j1 - j0); @@ -780,7 +781,7 @@ void fvec_inner_products_by_idx( size_t d, size_t nx, size_t ny) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t j = 0; j < nx; j++) { const int64_t* __restrict idsj = ids + j * ny; const float* xj = x + j * d; @@ -803,7 +804,7 @@ void fvec_L2sqr_by_idx( size_t d, size_t nx, size_t ny) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t j = 0; j < nx; j++) { const int64_t* __restrict idsj = ids + j * ny; const float* xj = x + j * d; @@ -824,7 +825,7 @@ void pairwise_indexed_L2sqr( const float* y, const int64_t* iy, float* dis) { -#pragma omp parallel for if (n > 1) +#pragma omp parallel for if (n > 1) num_threads(num_omp_threads) for (int64_t j = 0; j < n; j++) { if (ix[j] >= 0 && iy[j] >= 0) { dis[j] = fvec_L2sqr(x + d * ix[j], y + d * iy[j], d); @@ -840,7 +841,7 @@ void pairwise_indexed_inner_product( const float* y, const int64_t* iy, float* dis) { -#pragma omp parallel for if (n > 1) +#pragma omp parallel for if (n > 1) num_threads(num_omp_threads) for (int64_t j = 0; j < n; j++) { if (ix[j] >= 0 && iy[j] >= 0) { dis[j] = fvec_inner_product(x + d * ix[j], y + d * iy[j], d); @@ -865,7 +866,7 @@ void knn_inner_products_by_idx( ld_ids = ny; } -#pragma omp parallel for if (nx > 100) +#pragma omp parallel for if (nx > 100) num_threads(num_omp_threads) for (int64_t i = 0; i < nx; i++) { const float* x_ = x + i * d; const int64_t* idsi = ids + i * ld_ids; @@ -901,7 +902,7 @@ void knn_L2sqr_by_idx( if (ld_ids < 0) { ld_ids = ny; } -#pragma omp parallel for if (nx > 100) +#pragma omp parallel for if (nx > 100) num_threads(num_omp_threads) for (int64_t i = 0; i < nx; i++) { const float* x_ = x + i * d; const int64_t* __restrict idsi = ids + i * ld_ids; @@ -941,11 +942,11 @@ void pairwise_L2sqr( // store in beginning of distance matrix to avoid malloc float* b_norms = dis; -#pragma omp parallel for if (nb > 1) +#pragma omp parallel for if (nb > 1) num_threads(num_omp_threads) for (int64_t i = 0; i < nb; i++) b_norms[i] = fvec_norm_L2sqr(xb + i * ldb, d); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = 1; i < nq; i++) { float q_norm = fvec_norm_L2sqr(xq + i * ldq, d); for (int64_t j = 0; j < nb; j++) @@ -984,7 +985,7 @@ void inner_product_to_L2sqr( const float* nr2, size_t n1, size_t n2) { -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t j = 0; j < n1; j++) { float* disj = dis + j * n2; for (size_t i = 0; i < n2; i++) diff --git a/faiss/utils/distances_fused/avx512.cpp b/faiss/utils/distances_fused/avx512.cpp index 6ae8cb0469..6dc8b57c79 100644 --- a/faiss/utils/distances_fused/avx512.cpp +++ b/faiss/utils/distances_fused/avx512.cpp @@ -264,7 +264,7 @@ void exhaustive_L2sqr_fused_cmax( const size_t nx_p = (nx / NX_POINTS_PER_LOOP) * NX_POINTS_PER_LOOP; // the main loop. -#pragma omp parallel for schedule(dynamic) +#pragma omp parallel for schedule(dynamic) num_threads(num_omp_threads) for (size_t i = 0; i < nx_p; i += NX_POINTS_PER_LOOP) { kernel( x, y, y_transposed.data(), ny, res, y_norms, i); diff --git a/faiss/utils/distances_fused/simdlib_based.cpp b/faiss/utils/distances_fused/simdlib_based.cpp index 97ededd2f0..2b95466e12 100644 --- a/faiss/utils/distances_fused/simdlib_based.cpp +++ b/faiss/utils/distances_fused/simdlib_based.cpp @@ -259,7 +259,7 @@ void exhaustive_L2sqr_fused_cmax( const size_t nx_p = (nx / NX_POINTS_PER_LOOP) * NX_POINTS_PER_LOOP; // the main loop. -#pragma omp parallel for schedule(dynamic) +#pragma omp parallel for schedule(dynamic) num_threads(num_omp_threads) for (size_t i = 0; i < nx_p; i += NX_POINTS_PER_LOOP) { kernel( x, y, y_transposed.data(), ny, res, y_norms, i); diff --git a/faiss/utils/extra_distances.cpp b/faiss/utils/extra_distances.cpp index 8c0699880d..b9a5689849 100644 --- a/faiss/utils/extra_distances.cpp +++ b/faiss/utils/extra_distances.cpp @@ -10,6 +10,7 @@ #include #include + #include #include @@ -37,7 +38,7 @@ void pairwise_extra_distances_template( int64_t ldq, int64_t ldb, int64_t ldd) { -#pragma omp parallel for if (nq > 10) +#pragma omp parallel for if (nq > 10) num_threads(num_omp_threads) for (int64_t i = 0; i < nq; i++) { const float* xqi = xq + i * ldq; const float* xbj = xb; @@ -66,7 +67,7 @@ void knn_extra_metrics_template( for (size_t i0 = 0; i0 < nx; i0 += check_period) { size_t i1 = std::min(i0 + check_period, nx); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = i0; i < i1; i++) { const float* x_i = x + i * d; const float* y_j = y; diff --git a/faiss/utils/hamming.cpp b/faiss/utils/hamming.cpp index 7019183bd0..1ad1cbe715 100644 --- a/faiss/utils/hamming.cpp +++ b/faiss/utils/hamming.cpp @@ -183,7 +183,7 @@ static void hammings_knn_hc( const size_t block_size = hamming_batch_size; for (size_t j0 = 0; j0 < n2; j0 += block_size) { const size_t j1 = std::min(j0 + block_size, n2); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = 0; i < ha->nh; i++) { HammingComputer hc(bs1 + i * bytes_per_code, bytes_per_code); @@ -260,7 +260,7 @@ static void hammings_knn_mc( const size_t block_size = hamming_batch_size; for (size_t j0 = 0; j0 < nb; j0 += block_size) { const size_t j1 = std::min(j0 + block_size, nb); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t i = 0; i < na; ++i) { for (size_t j = j0; j < j1; ++j) { cs[i].update_counter(b + j * bytes_per_code, j); @@ -317,7 +317,7 @@ void fvecs2bitvecs( size_t d, size_t n) { const int64_t ncodes = ((d + 7) / 8); -#pragma omp parallel for if (n > 100000) +#pragma omp parallel for if (n > 100000) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) fvec2bitvec(x + i * d, b + i * ncodes, d); } @@ -328,7 +328,7 @@ void bitvecs2fvecs( size_t d, size_t n) { const int64_t ncodes = ((d + 7) / 8); -#pragma omp parallel for if (n > 100000) +#pragma omp parallel for if (n > 100000) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { binary_to_real(d, b + i * ncodes, x + i * d); } @@ -373,7 +373,7 @@ void bitvec_shuffle( size_t lda = (da + 7) / 8; size_t ldb = (db + 7) / 8; -#pragma omp parallel for if (n > 10000) +#pragma omp parallel for if (n > 10000) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { const uint8_t* ai = a + i * lda; uint8_t* bi = b + i * ldb; @@ -502,7 +502,7 @@ static void hamming_range_search_template( int radius, size_t code_size, RangeSearchResult* res) { -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) { RangeSearchPartialResult pres(res); @@ -679,7 +679,7 @@ void generalized_hammings_knn_hc( if (ordered) ha->heapify(); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int i = 0; i < na; i++) { const uint8_t* __restrict ca = a + i * code_size; const uint8_t* __restrict cb = b; diff --git a/faiss/utils/random.cpp b/faiss/utils/random.cpp index 9ab8d0adbe..48572b4084 100644 --- a/faiss/utils/random.cpp +++ b/faiss/utils/random.cpp @@ -68,7 +68,7 @@ void float_rand(float* x, size_t n, int64_t seed) { RandomGenerator rng0(seed); int a0 = rng0.rand_int(), b0 = rng0.rand_int(); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t j = 0; j < nblock; j++) { RandomGenerator rng(a0 + j * b0); @@ -87,7 +87,7 @@ void float_randn(float* x, size_t n, int64_t seed) { RandomGenerator rng0(seed); int a0 = rng0.rand_int(), b0 = rng0.rand_int(); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t j = 0; j < nblock; j++) { RandomGenerator rng(a0 + j * b0); @@ -121,7 +121,7 @@ void int64_rand(int64_t* x, size_t n, int64_t seed) { RandomGenerator rng0(seed); int a0 = rng0.rand_int(), b0 = rng0.rand_int(); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t j = 0; j < nblock; j++) { RandomGenerator rng(a0 + j * b0); @@ -139,7 +139,7 @@ void int64_rand_max(int64_t* x, size_t n, uint64_t max, int64_t seed) { RandomGenerator rng0(seed); int a0 = rng0.rand_int(), b0 = rng0.rand_int(); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t j = 0; j < nblock; j++) { RandomGenerator rng(a0 + j * b0); @@ -169,7 +169,7 @@ void byte_rand(uint8_t* x, size_t n, int64_t seed) { RandomGenerator rng0(seed); int a0 = rng0.rand_int(), b0 = rng0.rand_int(); -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int64_t j = 0; j < nblock; j++) { RandomGenerator rng(a0 + j * b0); @@ -210,7 +210,7 @@ void rand_smooth_vectors(size_t n, size_t d, float* x, int64_t seed) { std::vector scales(d); float_rand(scales.data(), d, seed + 2); -#pragma omp parallel for if (n * d > 10000) +#pragma omp parallel for if (n * d > 10000) num_threads(num_omp_threads) for (int64_t i = 0; i < n; i++) { for (size_t j = 0; j < d; j++) { x[i * d + j] = sinf(x[i * d + j] * (scales[j] * 4 + 0.1)); diff --git a/faiss/utils/random.h b/faiss/utils/random.h index 8b4286894a..7ef935a895 100644 --- a/faiss/utils/random.h +++ b/faiss/utils/random.h @@ -14,6 +14,7 @@ #pragma once #include +#include #include namespace faiss { diff --git a/faiss/utils/sorting.cpp b/faiss/utils/sorting.cpp index 18c3f963a5..7bd0fcbf14 100644 --- a/faiss/utils/sorting.cpp +++ b/faiss/utils/sorting.cpp @@ -10,6 +10,7 @@ #include #include + #include #include @@ -148,7 +149,7 @@ void fvec_argsort_parallel(size_t n, const float* vals, size_t* perm) { } } -#pragma omp parallel +#pragma omp parallel num_threads(num_omp_threads) for (size_t i = 0; i < n; i++) { permA[i] = i; } @@ -158,7 +159,7 @@ void fvec_argsort_parallel(size_t n, const float* vals, size_t* perm) { std::vector segs(nt); // independent sorts -#pragma omp parallel for +#pragma omp parallel for num_threads(num_omp_threads) for (int t = 0; t < nt; t++) { size_t i0 = t * n / nt; size_t i1 = (t + 1) * n / nt; diff --git a/faiss/utils/utils.cpp b/faiss/utils/utils.cpp index 894653b1cd..7e6d6afdae 100644 --- a/faiss/utils/utils.cpp +++ b/faiss/utils/utils.cpp @@ -27,6 +27,7 @@ #include + #include #include @@ -274,7 +275,7 @@ size_t merge_result_table_with( int64_t translation) { size_t n1 = 0; -#pragma omp parallel reduction(+ : n1) +#pragma omp parallel reduction(+ : n1) num_threads(num_omp_threads) { std::vector tmpI(k); std::vector tmpD(k); @@ -499,7 +500,7 @@ bool check_openmp() { std::vector nt_per_thread(10); size_t sum = 0; bool in_parallel = true; -#pragma omp parallel reduction(+ : sum) +#pragma omp parallel reduction(+ : sum) num_threads(num_omp_threads) { if (!omp_in_parallel()) { in_parallel = false;