Skip to content

Commit

Permalink
implement sampling in clustering.hpp
Browse files Browse the repository at this point in the history
  • Loading branch information
yaushian committed Dec 7, 2022
1 parent 2f0eee5 commit 095403f
Showing 1 changed file with 129 additions and 54 deletions.
183 changes: 129 additions & 54 deletions pecos/core/utils/clustering.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,30 @@ enum {
SKMEANS=5,
}; /* partition_algo */

enum {
CONSTANT_SAMPLE_SCHEDULE=0,
LINEAR_SAMPLE_SCHEDULE=1,
}; /* sample strategies */

extern "C" {
struct ClusteringSamplerParam {
int strategy;
float sample_rate;
float warmup_sample_rate;
float warmup_layer_rate;

ClusteringSamplerParam(
int strategy,
float sample_rate,
float warmup_sample_rate,
float warmup_layer_rate
): strategy(strategy),
sample_rate(sample_rate),
warmup_sample_rate(warmup_sample_rate),
warmup_layer_rate(warmup_layer_rate) {}
};
} // end of extern C

struct Node {
size_t start;
size_t end;
Expand Down Expand Up @@ -77,6 +101,33 @@ struct Tree {
seed_for_nodes.resize(nodes.size());
}

struct ClusteringSampler {
// scheduler for sampling
ClusteringSamplerParam* param_ptr;
size_t warmup_layers;
size_t depth;

ClusteringSampler(ClusteringSamplerParam* param_ptr, size_t depth): param_ptr(param_ptr), depth(depth) {
warmup_layers = size_t(depth * param_ptr->warmup_layer_rate);
}

float get_sample_rate(size_t layer) const {
if(param_ptr->strategy == LINEAR_SAMPLE_SCHEDULE) {
return _get_linear_sample_rate(layer);
}
return param_ptr->sample_rate; // Constant strategy
}

float _get_linear_sample_rate(size_t layer) const {
// If input `layer` < `warmup_layers`, return `warmup_sample_rate`.
// Otherwise, linearly increase the current sample rate from `warmup_sample_rate` to `sample_rate` until the last layer.
if(layer < warmup_layers) {
return param_ptr->warmup_sample_rate;
}
return param_ptr->warmup_sample_rate + (param_ptr->sample_rate - param_ptr->warmup_sample_rate) * (layer + 1 - warmup_layers) / (depth - warmup_layers);
}
};

struct comparator_by_value_t {
const float32_t *pred_val;
bool increasing;
Expand All @@ -102,6 +153,13 @@ struct Tree {
right.set(middle, root.end);
}

void sample_elements(Node& root, rng_t& rng, float cur_sample_rate) {
rng.shuffle(elements.begin() + root.start, elements.begin() + root.end);
size_t n_sp_elements = size_t(cur_sample_rate * root.size());
n_sp_elements = std::min(std::max(n_sp_elements, size_t(2)), root.size()); // clamp the value sampled elements
root.set(root.start, root.start + n_sp_elements);
}

// Sort elements by scores on node and return if this function changes the assignment
bool sort_elements_by_scores_on_node(const Node& root, int threads=1, bool increasing=true) {
auto prev_start_it = previous_elements.begin() + root.start;
Expand Down Expand Up @@ -155,10 +213,38 @@ struct Tree {
}

template<typename MAT>
void partition_kmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0) {
Node& root = root_of(nid);
Node& left = left_of(nid);
Node& right = right_of(nid);
bool do_assignment(MAT* feat_mat_ptr, Node& root, f32_sdvec_t* center_ptr, int threads) {
u64_dvec_t *elements_ptr = &elements;
auto *scores_ptr = &scores;
if(threads == 1) {
for(size_t i = root.start; i < root.end; i++) {
size_t eid = elements_ptr->at(i);
const auto& feat = feat_mat_ptr->get_row(eid);
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
}
} else {
#pragma omp parallel for shared(elements_ptr, scores_ptr, center_ptr, feat_mat_ptr)
for(size_t i = root.start; i < root.end; i++) {
size_t eid = elements_ptr->at(i);
const auto& feat = feat_mat_ptr->get_row(eid);
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
}
}
bool assignment_changed = sort_elements_by_scores_on_node(root, threads);
return assignment_changed;
}

template<typename MAT>
void partition_kmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0, float cur_sample_rate=1.0) {
// copy nodes rather than reference for sampling
Node root = root_of(nid);
Node left = left_of(nid);
Node right = right_of(nid);

// modify nodes' start and end based on cur_sample_rate
if(cur_sample_rate < 1.0) {
sample_elements(root, rng, cur_sample_rate);
}
partition_elements(root, left, right);

f32_sdvec_t& cur_center = center1[thread_id];
Expand Down Expand Up @@ -186,36 +272,32 @@ struct Tree {
alpha = -1.0 / left.size();
update_center(feat_mat, left, cur_center, alpha, threads);
}
u64_dvec_t *elements_ptr = &elements;
auto *scores_ptr = &scores;
auto *center_ptr = &cur_center;
const MAT* feat_mat_ptr = &feat_mat;
if(threads == 1) {
for(size_t i = root.start; i < root.end; i++) {
size_t eid = elements_ptr->at(i);
const auto& feat = feat_mat_ptr->get_row(eid);
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
}
} else {
#pragma omp parallel for shared(elements_ptr, scores_ptr, center_ptr, feat_mat_ptr)
for(size_t i = root.start; i < root.end; i++) {
size_t eid = elements_ptr->at(i);
const auto& feat = feat_mat_ptr->get_row(eid);
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
}
}
bool assignment_changed = sort_elements_by_scores_on_node(root, threads);
bool assignment_changed = do_assignment(&feat_mat, root, &cur_center, threads);
if(!assignment_changed) {
break;
}
}

// set indices for reference nodes
partition_elements(root_of(nid), left_of(nid), right_of(nid));

// perform inference on all elements
if(cur_sample_rate < 1.0) {
do_assignment(&feat_mat, root_of(nid), &cur_center, threads);
}
}

template<typename MAT>
void partition_skmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0) {
Node& root = root_of(nid);
Node& left = left_of(nid);
Node& right = right_of(nid);
void partition_skmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0, float cur_sample_rate=1.0) {
// copy nodes rather than reference for sampling
Node root = root_of(nid);
Node left = left_of(nid);
Node right = right_of(nid);

// modify nodes' start and end based on cur_sample_rate
if(cur_sample_rate < 1.0) {
sample_elements(root, rng, cur_sample_rate);
}
partition_elements(root, left, right);

f32_sdvec_t& cur_center1 = center1[thread_id];
Expand Down Expand Up @@ -253,35 +335,23 @@ struct Tree {

do_axpy(-1.0, cur_center2, cur_center1);
}


u64_dvec_t *elements_ptr = &elements;
auto *scores_ptr = &scores;
auto *center_ptr = &cur_center1;
const MAT* feat_mat_ptr = &feat_mat;
if(threads == 1) {
for(size_t i = root.start; i < root.end; i++) {
size_t eid = elements_ptr->at(i);
const auto& feat = feat_mat_ptr->get_row(eid);
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
}
} else {
#pragma omp parallel for shared(elements_ptr, scores_ptr, center_ptr, feat_mat_ptr)
for(size_t i = root.start; i < root.end; i++) {
size_t eid = elements_ptr->at(i);
const auto& feat = feat_mat_ptr->get_row(eid);
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
}
}
bool assignment_changed = sort_elements_by_scores_on_node(root, threads);
bool assignment_changed = do_assignment(&feat_mat, root, &cur_center1, threads);
if(!assignment_changed) {
break;
}
}

// set indices for reference nodes
partition_elements(root_of(nid), left_of(nid), right_of(nid));

// perform inference on all elements
if(cur_sample_rate < 1.0) {
do_assignment(&feat_mat, root_of(nid), &cur_center1, threads);
}
}

template<typename MAT, typename IND=unsigned>
void run_clustering(const MAT& feat_mat, int partition_algo, int seed=0, IND *label_codes=NULL, size_t max_iter=10, int threads=1) {
void run_clustering(const MAT& feat_mat, int partition_algo, int seed=0, IND *label_codes=NULL, size_t max_iter=10, int threads=1, ClusteringSamplerParam* sample_param_ptr=NULL) {
size_t nr_elements = feat_mat.rows;
elements.resize(nr_elements);
previous_elements.resize(nr_elements);
Expand All @@ -303,21 +373,26 @@ struct Tree {
// Allocate tmp arrays for parallel update center
center_tmp_thread.resize(threads, f32_sdvec_t(feat_mat.cols));

if(sample_param_ptr == NULL) {
sample_param_ptr = new ClusteringSamplerParam(CONSTANT_SAMPLE_SCHEDULE, 1.0, 1.0, 1.0); // no sampling for default constructor
}
ClusteringSampler sample_scheduler(sample_param_ptr, depth);

// let's do it layer by layer so we can parallelize it
for(size_t d = 0; d < depth; d++) {
size_t layer_start = 1U << d;
size_t layer_end = 1U << (d + 1);
float cur_sample_rate = sample_scheduler.get_sample_rate(d);
if((layer_end - layer_start) >= (size_t) threads) {
#pragma omp parallel for schedule(dynamic)
for(size_t nid = layer_start; nid < layer_end; nid++) {
rng_t rng(seed_for_nodes[nid]);
int local_threads = 1;
int thread_id = omp_get_thread_num();
if(partition_algo == KMEANS) {
partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id);
partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate);
} else if(partition_algo == SKMEANS) {
partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id);
partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate);
}
}
} else {
Expand All @@ -326,9 +401,9 @@ struct Tree {
int local_threads = threads;
int thread_id = 0;
if(partition_algo == KMEANS) {
partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id);
partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate);
} else if(partition_algo == SKMEANS) {
partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id);
partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate);
}
}
}
Expand Down Expand Up @@ -365,4 +440,4 @@ struct Tree {
} // end of namespace clustering
} // end of namespace pecos

#endif // end of __CLUSTERING_H__
#endif // end of __CLUSTERING_H__

0 comments on commit 095403f

Please sign in to comment.