Skip to content

Commit

Permalink
sparse-dense vec and parallel sorting for acceleration
Browse files Browse the repository at this point in the history
  • Loading branch information
yaushian committed Nov 12, 2022
1 parent f73537a commit 7e9183d
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 46 deletions.
2 changes: 1 addition & 1 deletion pecos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def class_fullname(cls):
return MetaClass.class_fullname(cls)

@classmethod
def append_meta(cls, d: dict = None):
def append_meta(cls, d = None):
meta = {"__meta__": {"class_fullname": cls.class_fullname()}}
if d is not None:
meta.update(d)
Expand Down
57 changes: 27 additions & 30 deletions pecos/core/utils/clustering.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "matrix.hpp"
#include "random.hpp"
#include "parallel.hpp"

namespace pecos {

Expand Down Expand Up @@ -53,20 +54,21 @@ struct Node {
struct Tree {
typedef random_number_generator<> rng_t;
typedef dense_vec_t<float32_t> dvec_wrapper_t;
typedef sdvec_t<uint32_t,float32_t> f32_sdvec_t;

size_t depth; // # leaf nodes = 2^depth
std::vector<Node> nodes;

// used for balanced 2-means
u64_dvec_t elements;
u64_dvec_t previous_elements;
std::vector<f32_dvec_t> center1; // need to be duplicated to handle parallel clustering
std::vector<f32_dvec_t> center2; // for spherical kmeans
std::vector<f32_sdvec_t> center1; // need to be duplicated to handle parallel clustering
std::vector<f32_sdvec_t> center2; // for spherical kmeans
u32_dvec_t seed_for_nodes; // random seeds used for each node
f32_dvec_t scores;

// Temporary working spaces for function update_center, will be cleared after clustering to release space
std::vector<f32_dvec_t> center_tmp_thread; // thread-private working array for parallel updating center
std::vector<f32_sdvec_t> center_tmp_thread; // thread-private working array for parallel updating center

Tree(size_t depth=0) { this->reset_depth(depth); }

Expand All @@ -90,7 +92,6 @@ struct Tree {
}
};


Node& root_of(size_t nid) { return nodes[nid]; }
Node& left_of(size_t nid) { return nodes[nid << 1]; }
Node& right_of(size_t nid) { return nodes[(nid << 1) + 1]; }
Expand All @@ -102,15 +103,15 @@ struct Tree {
}

// Sort elements by scores on node and return if this function changes the assignment
bool sort_elements_by_scores_on_node(const Node& root, bool increasing=true) {
bool sort_elements_by_scores_on_node(const Node& root, int threads=1, bool increasing=true) {
auto prev_start_it = previous_elements.begin() + root.start;
auto start_it = elements.begin() + root.start;
auto middle_it = elements.begin() + ((root.start + root.end) >> 1);
auto end_it = elements.begin() + root.end;
std::copy(start_it, middle_it, prev_start_it);
std::sort(start_it, end_it, comparator_by_value_t(scores.data(), increasing));
std::sort(start_it, middle_it);
std::sort(middle_it, end_it);
parallel_sort(start_it, end_it, comparator_by_value_t(scores.data(), increasing), threads);
parallel_sort(start_it, middle_it, std::less<size_t>(), threads);
parallel_sort(middle_it, end_it, std::less<size_t>(), threads);
return !std::equal(start_it, middle_it, prev_start_it);
}

Expand All @@ -125,7 +126,7 @@ struct Tree {

// Loop through node's elements and update current center
template<typename MAT>
void update_center(const MAT& feat_mat, Node& cur_node, dvec_wrapper_t& cur_center, float32_t alpha, int threads=1) {
void update_center(const MAT& feat_mat, Node& cur_node, f32_sdvec_t& cur_center, float32_t alpha, int threads=1) {
if(threads == 1) {
for(size_t i = cur_node.start; i < cur_node.end; i++) {
size_t eid = elements[i];
Expand All @@ -136,8 +137,8 @@ struct Tree {
#pragma omp parallel num_threads(threads)
{
int thread_id = omp_get_thread_num();
std::fill(center_tmp_thread[thread_id].begin(), center_tmp_thread[thread_id].end(), 0);
dvec_wrapper_t cur_center_tmp_thread(center_tmp_thread[thread_id]);
center_tmp_thread[thread_id].fill_zero();
f32_sdvec_t& cur_center_tmp_thread = center_tmp_thread[thread_id];
// use static for reproducibility under multi-trials with same seed.
#pragma omp for schedule(static)
for(size_t i = cur_node.start; i < cur_node.end; i++) {
Expand All @@ -146,13 +147,9 @@ struct Tree {
do_axpy(alpha, feat, cur_center_tmp_thread);
}
}

// global parallel reduction
#pragma omp parallel for schedule(static)
for(size_t i=0; i<cur_center.len; ++i) {
for(int thread_id = 0; thread_id < threads; thread_id++) {
cur_center[i] += center_tmp_thread[thread_id][i];
}

for(int thread_id = 0; thread_id < threads; thread_id++) {
do_axpy(1.0,center_tmp_thread[thread_id],cur_center);
}
}
}
Expand All @@ -164,12 +161,12 @@ struct Tree {
Node& right = right_of(nid);
partition_elements(root, left, right);

dvec_wrapper_t cur_center(center1[thread_id]);
f32_sdvec_t& cur_center = center1[thread_id];

// perform the clustering and sorting
for(size_t iter = 0; iter < max_iter; iter++) {
// construct cur_center (for right child)
std::fill(center1[thread_id].begin(), center1[thread_id].end(), 0);
cur_center.fill_zero();
if(iter == 0) {
auto right_idx = rng.randint(0, root.size() - 1);
auto left_idx = (right_idx + rng.randint(1, root.size() - 1)) % root.size();
Expand All @@ -189,6 +186,7 @@ struct Tree {
alpha = -1.0 / left.size();
update_center(feat_mat, left, cur_center, alpha, threads);
}

u64_dvec_t *elements_ptr = &elements;
auto *scores_ptr = &scores;
auto *center_ptr = &cur_center;
Expand All @@ -207,7 +205,7 @@ struct Tree {
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
}
}
bool assignment_changed = sort_elements_by_scores_on_node(root);
bool assignment_changed = sort_elements_by_scores_on_node(root, threads);
if(!assignment_changed) {
break;
}
Expand All @@ -221,15 +219,15 @@ struct Tree {
Node& right = right_of(nid);
partition_elements(root, left, right);

dvec_wrapper_t cur_center1(center1[thread_id]);
dvec_wrapper_t cur_center2(center2[thread_id]);
f32_sdvec_t& cur_center1 = center1[thread_id];
f32_sdvec_t& cur_center2 = center2[thread_id];

// perform the clustering and sorting
for(size_t iter = 0; iter < max_iter; iter++) {
float32_t one = 1.0;
// construct center1 (for right child)
std::fill(center1[thread_id].begin(), center1[thread_id].end(), 0);
std::fill(center2[thread_id].begin(), center2[thread_id].end(), 0);
cur_center1.fill_zero();
cur_center2.fill_zero();
if(iter == 0) {
auto right_idx = rng.randint(0, root.size() - 1);
auto left_idx = (right_idx + rng.randint(1, root.size() - 1)) % root.size();
Expand Down Expand Up @@ -257,7 +255,6 @@ struct Tree {
do_axpy(-1.0, cur_center2, cur_center1);
}


u64_dvec_t *elements_ptr = &elements;
auto *scores_ptr = &scores;
auto *center_ptr = &cur_center1;
Expand All @@ -276,7 +273,7 @@ struct Tree {
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
}
}
bool assignment_changed = sort_elements_by_scores_on_node(root);
bool assignment_changed = sort_elements_by_scores_on_node(root, threads);
if(!assignment_changed) {
break;
}
Expand All @@ -297,14 +294,14 @@ struct Tree {
}

threads = set_threads(threads);
center1.resize(threads, f32_dvec_t(feat_mat.cols, 0));
center2.resize(threads, f32_dvec_t(feat_mat.cols, 0));
center1.resize(threads, f32_sdvec_t(feat_mat.cols));
center2.resize(threads, f32_sdvec_t(feat_mat.cols));
scores.resize(feat_mat.rows, 0);
nodes[0].set(0, nr_elements);
nodes[1].set(0, nr_elements);

// Allocate tmp arrays for parallel update center
center_tmp_thread.resize(threads, f32_dvec_t(feat_mat.cols, 0));
center_tmp_thread.resize(threads, f32_sdvec_t(feat_mat.cols));


// let's do it layer by layer so we can parallelize it
Expand Down
Loading

0 comments on commit 7e9183d

Please sign in to comment.