diff --git a/pecos/__init__.py b/pecos/__init__.py index 69801d3f..51ecdd98 100644 --- a/pecos/__init__.py +++ b/pecos/__init__.py @@ -38,7 +38,7 @@ def class_fullname(cls): return MetaClass.class_fullname(cls) @classmethod - def append_meta(cls, d: dict = None): + def append_meta(cls, d = None): meta = {"__meta__": {"class_fullname": cls.class_fullname()}} if d is not None: meta.update(d) diff --git a/pecos/core/utils/clustering.hpp b/pecos/core/utils/clustering.hpp index 7101d517..73d76c89 100644 --- a/pecos/core/utils/clustering.hpp +++ b/pecos/core/utils/clustering.hpp @@ -19,6 +19,7 @@ #include "matrix.hpp" #include "random.hpp" +#include "parallel.hpp" namespace pecos { @@ -53,6 +54,7 @@ struct Node { struct Tree { typedef random_number_generator<> rng_t; typedef dense_vec_t dvec_wrapper_t; + typedef sdvec_t f32_sdvec_t; size_t depth; // # leaf nodes = 2^depth std::vector nodes; @@ -60,13 +62,13 @@ struct Tree { // used for balanced 2-means u64_dvec_t elements; u64_dvec_t previous_elements; - std::vector center1; // need to be duplicated to handle parallel clustering - std::vector center2; // for spherical kmeans + std::vector center1; // need to be duplicated to handle parallel clustering + std::vector center2; // for spherical kmeans u32_dvec_t seed_for_nodes; // random seeds used for each node f32_dvec_t scores; // Temporary working spaces for function update_center, will be cleared after clustering to release space - std::vector center_tmp_thread; // thread-private working array for parallel updating center + std::vector center_tmp_thread; // thread-private working array for parallel updating center Tree(size_t depth=0) { this->reset_depth(depth); } @@ -90,7 +92,6 @@ struct Tree { } }; - Node& root_of(size_t nid) { return nodes[nid]; } Node& left_of(size_t nid) { return nodes[nid << 1]; } Node& right_of(size_t nid) { return nodes[(nid << 1) + 1]; } @@ -102,15 +103,15 @@ struct Tree { } // Sort elements by scores on node and return if this function changes the assignment - bool sort_elements_by_scores_on_node(const Node& root, bool increasing=true) { + bool sort_elements_by_scores_on_node(const Node& root, int threads=1, bool increasing=true) { auto prev_start_it = previous_elements.begin() + root.start; auto start_it = elements.begin() + root.start; auto middle_it = elements.begin() + ((root.start + root.end) >> 1); auto end_it = elements.begin() + root.end; std::copy(start_it, middle_it, prev_start_it); - std::sort(start_it, end_it, comparator_by_value_t(scores.data(), increasing)); - std::sort(start_it, middle_it); - std::sort(middle_it, end_it); + parallel_sort(start_it, end_it, comparator_by_value_t(scores.data(), increasing), threads); + parallel_sort(start_it, middle_it, std::less(), threads); + parallel_sort(middle_it, end_it, std::less(), threads); return !std::equal(start_it, middle_it, prev_start_it); } @@ -125,7 +126,7 @@ struct Tree { // Loop through node's elements and update current center template - void update_center(const MAT& feat_mat, Node& cur_node, dvec_wrapper_t& cur_center, float32_t alpha, int threads=1) { + void update_center(const MAT& feat_mat, Node& cur_node, f32_sdvec_t& cur_center, float32_t alpha, int threads=1) { if(threads == 1) { for(size_t i = cur_node.start; i < cur_node.end; i++) { size_t eid = elements[i]; @@ -136,8 +137,8 @@ struct Tree { #pragma omp parallel num_threads(threads) { int thread_id = omp_get_thread_num(); - std::fill(center_tmp_thread[thread_id].begin(), center_tmp_thread[thread_id].end(), 0); - dvec_wrapper_t cur_center_tmp_thread(center_tmp_thread[thread_id]); + center_tmp_thread[thread_id].fill_zero(); + f32_sdvec_t& cur_center_tmp_thread = center_tmp_thread[thread_id]; // use static for reproducibility under multi-trials with same seed. #pragma omp for schedule(static) for(size_t i = cur_node.start; i < cur_node.end; i++) { @@ -146,13 +147,9 @@ struct Tree { do_axpy(alpha, feat, cur_center_tmp_thread); } } - - // global parallel reduction - #pragma omp parallel for schedule(static) - for(size_t i=0; iat(eid) = do_dot_product(*center_ptr, feat); } } - bool assignment_changed = sort_elements_by_scores_on_node(root); + bool assignment_changed = sort_elements_by_scores_on_node(root, threads); if(!assignment_changed) { break; } @@ -221,15 +219,15 @@ struct Tree { Node& right = right_of(nid); partition_elements(root, left, right); - dvec_wrapper_t cur_center1(center1[thread_id]); - dvec_wrapper_t cur_center2(center2[thread_id]); + f32_sdvec_t& cur_center1 = center1[thread_id]; + f32_sdvec_t& cur_center2 = center2[thread_id]; // perform the clustering and sorting for(size_t iter = 0; iter < max_iter; iter++) { float32_t one = 1.0; // construct center1 (for right child) - std::fill(center1[thread_id].begin(), center1[thread_id].end(), 0); - std::fill(center2[thread_id].begin(), center2[thread_id].end(), 0); + cur_center1.fill_zero(); + cur_center2.fill_zero(); if(iter == 0) { auto right_idx = rng.randint(0, root.size() - 1); auto left_idx = (right_idx + rng.randint(1, root.size() - 1)) % root.size(); @@ -257,7 +255,6 @@ struct Tree { do_axpy(-1.0, cur_center2, cur_center1); } - u64_dvec_t *elements_ptr = &elements; auto *scores_ptr = &scores; auto *center_ptr = &cur_center1; @@ -276,7 +273,7 @@ struct Tree { scores_ptr->at(eid) = do_dot_product(*center_ptr, feat); } } - bool assignment_changed = sort_elements_by_scores_on_node(root); + bool assignment_changed = sort_elements_by_scores_on_node(root, threads); if(!assignment_changed) { break; } @@ -297,14 +294,14 @@ struct Tree { } threads = set_threads(threads); - center1.resize(threads, f32_dvec_t(feat_mat.cols, 0)); - center2.resize(threads, f32_dvec_t(feat_mat.cols, 0)); + center1.resize(threads, f32_sdvec_t(feat_mat.cols)); + center2.resize(threads, f32_sdvec_t(feat_mat.cols)); scores.resize(feat_mat.rows, 0); nodes[0].set(0, nr_elements); nodes[1].set(0, nr_elements); // Allocate tmp arrays for parallel update center - center_tmp_thread.resize(threads, f32_dvec_t(feat_mat.cols, 0)); + center_tmp_thread.resize(threads, f32_sdvec_t(feat_mat.cols)); // let's do it layer by layer so we can parallelize it diff --git a/pecos/core/utils/matrix.hpp b/pecos/core/utils/matrix.hpp index 8814cd90..9f8d8644 100644 --- a/pecos/core/utils/matrix.hpp +++ b/pecos/core/utils/matrix.hpp @@ -72,6 +72,68 @@ extern "C" { } // end of extern C namespace pecos { + // ===== Container for sparse-dense vectors ===== + // For sparse vectors computational acceleration + template + struct sdvec_t { + typedef IDX_T index_type; + typedef VAL_T value_type; + + index_type len; + index_type nr_touch; + + struct entry_t { + value_type val; + bool touched; + entry_t(value_type val=0, bool touched=0): val(val), touched(touched) {} + }; + + std::vector entries; + std::vector touched_indices; + + sdvec_t(index_type len=0) : len(len), nr_touch(0) { + entries.resize(len); + touched_indices.resize(len); + } + + + void resize(index_type len_) { + len = len_; + nr_touch = 0; + entries.resize(len); + touched_indices.resize(len); + } + + value_type& add_nonzero_at(index_type idx, value_type v) { + entries[idx].val += static_cast(v); + if(!entries[idx].touched) { + entries[idx].touched = 1; + touched_indices[nr_touch++] = static_cast(idx); + } + return entries[idx].val; + } + + void sort_nz_indices() { + std::sort(touched_indices.data(), touched_indices.data() + nr_touch); + } + + void fill_zero() { + if(nr_touch < (len >> 1)) { + for(size_t t = 0; t < nr_touch; t++) { + entries[touched_indices[t]].val = 0; + entries[touched_indices[t]].touched = 0; + } + } else { + memset(static_cast(entries.data()), 0, sizeof(entry_t) * len); + } + nr_touch = 0; + } + + bool is_dense() const { + return float(nr_touch)/float(len) > 0.3; + } + }; + // ===== Wrapper for sparse/dense vectors ===== template @@ -83,7 +145,7 @@ namespace pecos { index_type* idx; value_type* val; sparse_vec_t(index_type nnz=0, index_type* idx=NULL, value_type* val=NULL): nnz(nnz), idx(idx), val(val) {} - + index_type get_nnz() const { return nnz; } }; @@ -106,6 +168,9 @@ namespace pecos { uint64_t get_nnz() const { return len; } }; + + + // ===== Wrapper for sparse/dense matrices ===== struct csr_t; struct csc_t; @@ -607,6 +672,23 @@ namespace pecos { template<> inline float copy(ptrdiff_t *len, float *x, ptrdiff_t *xinc, float *y, ptrdiff_t *yinc) { return scopy_(len,x,xinc,y,yinc); } // ===== do_dot_product ===== + template + float32_t do_dot_product(const sparse_vec_t& x, const sdvec_t& y) { + float32_t ret = 0; + for(size_t s=0; s < x.nnz; s++) { + auto idx = x.idx[s]; + if(y.entries[idx].touched) + ret += y.entries[idx].val * x.val[s]; + } + return ret; + } + + template + float32_t do_dot_product(const sdvec_t& x,const sparse_vec_t& y) { + return do_dot_product(y,x); + } + + template val_type do_dot_product(const val_type *x, const val_type *y, size_t size) { // This uses a BLAS implementation @@ -665,6 +747,43 @@ namespace pecos { return do_dot_product(y, x); } + + template + float32_t do_dot_product(const sdvec_t& x, const sdvec_t& y) { + if(x.nr_touch > y.nr_touch) { + return do_dot_product(y,x); + } + float32_t ret = 0; + for(size_t s=0; s < x.nr_touch; s++) { + auto &idx = x.touched_indices[s]; + if(y.entries[idx].touched) + ret += y.entries[idx].val * x.entries[idx].val; + } + return static_cast(ret); + } + + template + float32_t do_dot_product(const sdvec_t& x, const dense_vec_t& y) { + float32_t ret = 0; + if(x.is_dense()) { + for(size_t i = 0; i < y.len; i++) { + ret += x.entries[i].val * y[i]; + } + } + else { + for(size_t s=0; s < x.nr_touch; s++) { + auto &idx = x.touched_indices[s]; + ret += x.entries[idx].val * y[idx]; + } + } + return static_cast(ret); + } + + template + float32_t do_dot_product(const dense_vec_t& x, const sdvec_t& y) { + return do_dot_product(y,x); + } + // ===== do_ax2py ===== template void do_ax2py(T alpha, const dense_vec_t &x, dense_vec_t &y) { @@ -727,6 +846,36 @@ namespace pecos { return do_axpy(alpha, x, dense_vec_t(y)); } + template + void do_axpy(T alpha, const sparse_vec_t& x, sdvec_t& y) { + for(size_t s=0; s < x.nnz; s++) { + y.add_nonzero_at(x.idx[s], x.val[s] * alpha); + } + } + + + template + void do_axpy(T alpha, const sdvec_t& x, sdvec_t& y) { + for(size_t s=0; s < x.nr_touch; s++) { + auto &idx = x.touched_indices[s]; + y.add_nonzero_at(idx, x.entries[idx].val * alpha); + } + } + + + template + void do_axpy(T alpha, const dense_vec_t& x, sdvec_t& y) { + if(y.nr_touch == x.len) + for(size_t i = 0; i < x.len; i++) { + y.entries[i].val += alpha * x[i]; + } + else { + for(size_t i = 0; i < x.len; i++) { + y.add_nonzero_at(i, alpha * x[i]); + } + } + } + // ===== do_scale ===== template void do_scale(T alpha, val_type *x, size_t size) { @@ -755,6 +904,14 @@ namespace pecos { } } + template + void do_scale(T alpha, sdvec_t& x) { + for(size_t s=0; s < x.nr_touch; s++) { + auto &idx = x.touched_indices[s]; + x.entries[idx].val = x.entries[idx].val * alpha; + } + } + template void compute_sparse_entries_from_rowmajored_X_and_colmajored_M(const X_MAT& X, const M_MAT& M, uint64_t len, uint32_t *X_row_idx, uint32_t *M_col_idx, V *val, int threads) { // This function assume that nz entries in both x and y are stored in an diff --git a/pecos/core/utils/parallel.hpp b/pecos/core/utils/parallel.hpp index 12e114ff..8c6a03eb 100644 --- a/pecos/core/utils/parallel.hpp +++ b/pecos/core/utils/parallel.hpp @@ -18,6 +18,8 @@ #include #include #include +// for __gnu_parallel +#include namespace pecos { @@ -70,6 +72,19 @@ namespace pecos { } } } + + template + void parallel_sort(InputIt first, InputIt last, Compare comp, int threads=-1) { + threads = set_threads(threads); + typedef typename std::iterator_traits::difference_type difference_type; + difference_type len = last - first; + if(threads == 1 || len < threads) { + std::sort(first, last, comp); + } else { + __gnu_parallel::multiway_mergesort_tag parallelism(threads); + __gnu_parallel::sort(first, last, comp, parallelism); + } + } } // end namespace pecos #endif // end of __PARALLEL_H__ diff --git a/pecos/core/utils/tfidf.hpp b/pecos/core/utils/tfidf.hpp index 2a2907aa..1fa64d7f 100644 --- a/pecos/core/utils/tfidf.hpp +++ b/pecos/core/utils/tfidf.hpp @@ -25,8 +25,6 @@ #include #include #include -// for __gnu_parallel -#include // string_view available since c++17 // only in experimental/string_view in c++14 @@ -295,18 +293,6 @@ void append_lines_to_string_view(char* buffer, size_t buffer_size, sv_vec_t& lin } } -template -void parallel_sort(InputIt first, InputIt last, Compare comp, int threads=-1) { - threads = set_threads(threads); - typedef typename std::iterator_traits::difference_type difference_type; - difference_type len = last - first; - if(threads == 1 || len < threads) { - std::sort(first, last, comp); - } else { - __gnu_parallel::multiway_mergesort_tag parallelism(threads); - __gnu_parallel::sort(first, last, comp, parallelism); - } -} class Tokenizer { public: