From d414bbde3fcb435e45c423719db7dbd6f780aa1b Mon Sep 17 00:00:00 2001 From: yaushian Date: Fri, 2 Dec 2022 01:09:14 +0000 Subject: [PATCH] implement sampling in clustering.hpp --- pecos/core/utils/clustering.hpp | 183 ++++++++++++++++++++++---------- 1 file changed, 129 insertions(+), 54 deletions(-) diff --git a/pecos/core/utils/clustering.hpp b/pecos/core/utils/clustering.hpp index 21579909..a81c9200 100644 --- a/pecos/core/utils/clustering.hpp +++ b/pecos/core/utils/clustering.hpp @@ -31,6 +31,30 @@ enum { SKMEANS=5, }; /* partition_algo */ +enum { + CONSTANT_SAMPLE_SCHEDULE=0, + LINEAR_SAMPLE_SCHEDULE=1, +}; /* sample strategies */ + +extern "C" { + struct ClusterSamplerParam { + int strategy; + float sample_rate; + float warmup_sample_rate; + float warmup_layer_rate; + + ClusterSamplerParam( + int strategy, + float sample_rate, + float warmup_sample_rate, + float warmup_layer_rate + ): strategy(strategy), + sample_rate(sample_rate), + warmup_sample_rate(warmup_sample_rate), + warmup_layer_rate(warmup_layer_rate) {} + }; +} // end of extern C + struct Node { size_t start; size_t end; @@ -77,6 +101,33 @@ struct Tree { seed_for_nodes.resize(nodes.size()); } + struct SampleScheduler { + // scheduler for sampling + ClusterSamplerParam* param_ptr; + size_t warmup_layers; + size_t depth; + + SampleScheduler(ClusterSamplerParam* param_ptr, size_t depth): param_ptr(param_ptr), depth(depth) { + warmup_layers = size_t(depth * param_ptr->warmup_layer_rate); + } + + float get_sample_rate(size_t layer) const { + if(param_ptr->strategy == LINEAR_SAMPLE_SCHEDULE) { + return _get_linear_sample_rate(layer); + } + return param_ptr->sample_rate; // Constant strategy + } + + float _get_linear_sample_rate(size_t layer) const { + // If input `layer` < `warmup_layers`, return `warmup_sample_rate`. + // Otherwise, linearly increase the current sample rate from `warmup_sample_rate` to `sample_rate` until the last layer. + if(layer < warmup_layers) { + return param_ptr->warmup_sample_rate; + } + return param_ptr->warmup_sample_rate + (param_ptr->sample_rate - param_ptr->warmup_sample_rate) * (layer + 1 - warmup_layers) / (depth - warmup_layers); + } + }; + struct comparator_by_value_t { const float32_t *pred_val; bool increasing; @@ -102,6 +153,13 @@ struct Tree { right.set(middle, root.end); } + void sample_elements(Node& root, rng_t& rng, float cur_sample_rate) { + rng.shuffle(elements.begin() + root.start, elements.begin() + root.end); + size_t n_sp_elements = size_t(cur_sample_rate * root.size()); + n_sp_elements = std::min(std::max(n_sp_elements, size_t(2)), root.size()); // clamp the value sampled elements + root.set(root.start, root.start + n_sp_elements); + } + // Sort elements by scores on node and return if this function changes the assignment bool sort_elements_by_scores_on_node(const Node& root, int threads=1, bool increasing=true) { auto prev_start_it = previous_elements.begin() + root.start; @@ -155,10 +213,38 @@ struct Tree { } template - void partition_kmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0) { - Node& root = root_of(nid); - Node& left = left_of(nid); - Node& right = right_of(nid); + bool do_assignment(MAT* feat_mat_ptr, Node& root, f32_sdvec_t* center_ptr, int threads) { + u64_dvec_t *elements_ptr = &elements; + auto *scores_ptr = &scores; + if(threads == 1) { + for(size_t i = root.start; i < root.end; i++) { + size_t eid = elements_ptr->at(i); + const auto& feat = feat_mat_ptr->get_row(eid); + scores_ptr->at(eid) = do_dot_product(*center_ptr, feat); + } + } else { +#pragma omp parallel for shared(elements_ptr, scores_ptr, center_ptr, feat_mat_ptr) + for(size_t i = root.start; i < root.end; i++) { + size_t eid = elements_ptr->at(i); + const auto& feat = feat_mat_ptr->get_row(eid); + scores_ptr->at(eid) = do_dot_product(*center_ptr, feat); + } + } + bool assignment_changed = sort_elements_by_scores_on_node(root, threads); + return assignment_changed; + } + + template + void partition_kmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0, float cur_sample_rate=1.0) { + // copy nodes rather than reference for sampling + Node root = root_of(nid); + Node left = left_of(nid); + Node right = right_of(nid); + + // modify nodes' start and end based on cur_sample_rate + if(cur_sample_rate < 1.0) { + sample_elements(root, rng, cur_sample_rate); + } partition_elements(root, left, right); f32_sdvec_t& cur_center = center1[thread_id]; @@ -186,36 +272,32 @@ struct Tree { alpha = -1.0 / left.size(); update_center(feat_mat, left, cur_center, alpha, threads); } - u64_dvec_t *elements_ptr = &elements; - auto *scores_ptr = &scores; - auto *center_ptr = &cur_center; - const MAT* feat_mat_ptr = &feat_mat; - if(threads == 1) { - for(size_t i = root.start; i < root.end; i++) { - size_t eid = elements_ptr->at(i); - const auto& feat = feat_mat_ptr->get_row(eid); - scores_ptr->at(eid) = do_dot_product(*center_ptr, feat); - } - } else { -#pragma omp parallel for shared(elements_ptr, scores_ptr, center_ptr, feat_mat_ptr) - for(size_t i = root.start; i < root.end; i++) { - size_t eid = elements_ptr->at(i); - const auto& feat = feat_mat_ptr->get_row(eid); - scores_ptr->at(eid) = do_dot_product(*center_ptr, feat); - } - } - bool assignment_changed = sort_elements_by_scores_on_node(root, threads); + bool assignment_changed = do_assignment(&feat_mat, root, &cur_center, threads); if(!assignment_changed) { break; } } + + // set indices for reference nodes + partition_elements(root_of(nid), left_of(nid), right_of(nid)); + + // perform inference on all elements + if(cur_sample_rate < 1.0) { + do_assignment(&feat_mat, root_of(nid), &cur_center, threads); + } } template - void partition_skmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0) { - Node& root = root_of(nid); - Node& left = left_of(nid); - Node& right = right_of(nid); + void partition_skmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0, float cur_sample_rate=1.0) { + // copy nodes rather than reference for sampling + Node root = root_of(nid); + Node left = left_of(nid); + Node right = right_of(nid); + + // modify nodes' start and end based on cur_sample_rate + if(cur_sample_rate < 1.0) { + sample_elements(root, rng, cur_sample_rate); + } partition_elements(root, left, right); f32_sdvec_t& cur_center1 = center1[thread_id]; @@ -253,35 +335,23 @@ struct Tree { do_axpy(-1.0, cur_center2, cur_center1); } - - - u64_dvec_t *elements_ptr = &elements; - auto *scores_ptr = &scores; - auto *center_ptr = &cur_center1; - const MAT* feat_mat_ptr = &feat_mat; - if(threads == 1) { - for(size_t i = root.start; i < root.end; i++) { - size_t eid = elements_ptr->at(i); - const auto& feat = feat_mat_ptr->get_row(eid); - scores_ptr->at(eid) = do_dot_product(*center_ptr, feat); - } - } else { -#pragma omp parallel for shared(elements_ptr, scores_ptr, center_ptr, feat_mat_ptr) - for(size_t i = root.start; i < root.end; i++) { - size_t eid = elements_ptr->at(i); - const auto& feat = feat_mat_ptr->get_row(eid); - scores_ptr->at(eid) = do_dot_product(*center_ptr, feat); - } - } - bool assignment_changed = sort_elements_by_scores_on_node(root, threads); + bool assignment_changed = do_assignment(&feat_mat, root, &cur_center1, threads); if(!assignment_changed) { break; } } + + // set indices for reference nodes + partition_elements(root_of(nid), left_of(nid), right_of(nid)); + + // perform inference on all elements + if(cur_sample_rate < 1.0) { + do_assignment(&feat_mat, root_of(nid), &cur_center1, threads); + } } template - void run_clustering(const MAT& feat_mat, int partition_algo, int seed=0, IND *label_codes=NULL, size_t max_iter=10, int threads=1) { + void run_clustering(const MAT& feat_mat, int partition_algo, int seed=0, IND *label_codes=NULL, size_t max_iter=10, int threads=1, ClusterSamplerParam* sample_param_ptr=NULL) { size_t nr_elements = feat_mat.rows; elements.resize(nr_elements); previous_elements.resize(nr_elements); @@ -303,11 +373,16 @@ struct Tree { // Allocate tmp arrays for parallel update center center_tmp_thread.resize(threads, f32_sdvec_t(feat_mat.cols)); + if(sample_param_ptr == NULL) { + sample_param_ptr = new ClusterSamplerParam(CONSTANT_SAMPLE_SCHEDULE, 1.0, 1.0, 1.0); // no sampling for default constructor + } + SampleScheduler sample_scheduler(sample_param_ptr, depth); // let's do it layer by layer so we can parallelize it for(size_t d = 0; d < depth; d++) { size_t layer_start = 1U << d; size_t layer_end = 1U << (d + 1); + float cur_sample_rate = sample_scheduler.get_sample_rate(d); if((layer_end - layer_start) >= (size_t) threads) { #pragma omp parallel for schedule(dynamic) for(size_t nid = layer_start; nid < layer_end; nid++) { @@ -315,9 +390,9 @@ struct Tree { int local_threads = 1; int thread_id = omp_get_thread_num(); if(partition_algo == KMEANS) { - partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id); + partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate); } else if(partition_algo == SKMEANS) { - partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id); + partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate); } } } else { @@ -326,9 +401,9 @@ struct Tree { int local_threads = threads; int thread_id = 0; if(partition_algo == KMEANS) { - partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id); + partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate); } else if(partition_algo == SKMEANS) { - partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id); + partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate); } } } @@ -365,4 +440,4 @@ struct Tree { } // end of namespace clustering } // end of namespace pecos -#endif // end of __CLUSTERING_H__ +#endif // end of __CLUSTERING_H__ \ No newline at end of file