Skip to content

Commit d414bbd

Browse files
committed
implement sampling in clustering.hpp
1 parent 2f0eee5 commit d414bbd

File tree

1 file changed

+129
-54
lines changed

1 file changed

+129
-54
lines changed

pecos/core/utils/clustering.hpp

+129-54
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,30 @@ enum {
3131
SKMEANS=5,
3232
}; /* partition_algo */
3333

34+
enum {
35+
CONSTANT_SAMPLE_SCHEDULE=0,
36+
LINEAR_SAMPLE_SCHEDULE=1,
37+
}; /* sample strategies */
38+
39+
extern "C" {
40+
struct ClusterSamplerParam {
41+
int strategy;
42+
float sample_rate;
43+
float warmup_sample_rate;
44+
float warmup_layer_rate;
45+
46+
ClusterSamplerParam(
47+
int strategy,
48+
float sample_rate,
49+
float warmup_sample_rate,
50+
float warmup_layer_rate
51+
): strategy(strategy),
52+
sample_rate(sample_rate),
53+
warmup_sample_rate(warmup_sample_rate),
54+
warmup_layer_rate(warmup_layer_rate) {}
55+
};
56+
} // end of extern C
57+
3458
struct Node {
3559
size_t start;
3660
size_t end;
@@ -77,6 +101,33 @@ struct Tree {
77101
seed_for_nodes.resize(nodes.size());
78102
}
79103

104+
struct SampleScheduler {
105+
// scheduler for sampling
106+
ClusterSamplerParam* param_ptr;
107+
size_t warmup_layers;
108+
size_t depth;
109+
110+
SampleScheduler(ClusterSamplerParam* param_ptr, size_t depth): param_ptr(param_ptr), depth(depth) {
111+
warmup_layers = size_t(depth * param_ptr->warmup_layer_rate);
112+
}
113+
114+
float get_sample_rate(size_t layer) const {
115+
if(param_ptr->strategy == LINEAR_SAMPLE_SCHEDULE) {
116+
return _get_linear_sample_rate(layer);
117+
}
118+
return param_ptr->sample_rate; // Constant strategy
119+
}
120+
121+
float _get_linear_sample_rate(size_t layer) const {
122+
// If input `layer` < `warmup_layers`, return `warmup_sample_rate`.
123+
// Otherwise, linearly increase the current sample rate from `warmup_sample_rate` to `sample_rate` until the last layer.
124+
if(layer < warmup_layers) {
125+
return param_ptr->warmup_sample_rate;
126+
}
127+
return param_ptr->warmup_sample_rate + (param_ptr->sample_rate - param_ptr->warmup_sample_rate) * (layer + 1 - warmup_layers) / (depth - warmup_layers);
128+
}
129+
};
130+
80131
struct comparator_by_value_t {
81132
const float32_t *pred_val;
82133
bool increasing;
@@ -102,6 +153,13 @@ struct Tree {
102153
right.set(middle, root.end);
103154
}
104155

156+
void sample_elements(Node& root, rng_t& rng, float cur_sample_rate) {
157+
rng.shuffle(elements.begin() + root.start, elements.begin() + root.end);
158+
size_t n_sp_elements = size_t(cur_sample_rate * root.size());
159+
n_sp_elements = std::min(std::max(n_sp_elements, size_t(2)), root.size()); // clamp the value sampled elements
160+
root.set(root.start, root.start + n_sp_elements);
161+
}
162+
105163
// Sort elements by scores on node and return if this function changes the assignment
106164
bool sort_elements_by_scores_on_node(const Node& root, int threads=1, bool increasing=true) {
107165
auto prev_start_it = previous_elements.begin() + root.start;
@@ -155,10 +213,38 @@ struct Tree {
155213
}
156214

157215
template<typename MAT>
158-
void partition_kmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0) {
159-
Node& root = root_of(nid);
160-
Node& left = left_of(nid);
161-
Node& right = right_of(nid);
216+
bool do_assignment(MAT* feat_mat_ptr, Node& root, f32_sdvec_t* center_ptr, int threads) {
217+
u64_dvec_t *elements_ptr = &elements;
218+
auto *scores_ptr = &scores;
219+
if(threads == 1) {
220+
for(size_t i = root.start; i < root.end; i++) {
221+
size_t eid = elements_ptr->at(i);
222+
const auto& feat = feat_mat_ptr->get_row(eid);
223+
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
224+
}
225+
} else {
226+
#pragma omp parallel for shared(elements_ptr, scores_ptr, center_ptr, feat_mat_ptr)
227+
for(size_t i = root.start; i < root.end; i++) {
228+
size_t eid = elements_ptr->at(i);
229+
const auto& feat = feat_mat_ptr->get_row(eid);
230+
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
231+
}
232+
}
233+
bool assignment_changed = sort_elements_by_scores_on_node(root, threads);
234+
return assignment_changed;
235+
}
236+
237+
template<typename MAT>
238+
void partition_kmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0, float cur_sample_rate=1.0) {
239+
// copy nodes rather than reference for sampling
240+
Node root = root_of(nid);
241+
Node left = left_of(nid);
242+
Node right = right_of(nid);
243+
244+
// modify nodes' start and end based on cur_sample_rate
245+
if(cur_sample_rate < 1.0) {
246+
sample_elements(root, rng, cur_sample_rate);
247+
}
162248
partition_elements(root, left, right);
163249

164250
f32_sdvec_t& cur_center = center1[thread_id];
@@ -186,36 +272,32 @@ struct Tree {
186272
alpha = -1.0 / left.size();
187273
update_center(feat_mat, left, cur_center, alpha, threads);
188274
}
189-
u64_dvec_t *elements_ptr = &elements;
190-
auto *scores_ptr = &scores;
191-
auto *center_ptr = &cur_center;
192-
const MAT* feat_mat_ptr = &feat_mat;
193-
if(threads == 1) {
194-
for(size_t i = root.start; i < root.end; i++) {
195-
size_t eid = elements_ptr->at(i);
196-
const auto& feat = feat_mat_ptr->get_row(eid);
197-
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
198-
}
199-
} else {
200-
#pragma omp parallel for shared(elements_ptr, scores_ptr, center_ptr, feat_mat_ptr)
201-
for(size_t i = root.start; i < root.end; i++) {
202-
size_t eid = elements_ptr->at(i);
203-
const auto& feat = feat_mat_ptr->get_row(eid);
204-
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
205-
}
206-
}
207-
bool assignment_changed = sort_elements_by_scores_on_node(root, threads);
275+
bool assignment_changed = do_assignment(&feat_mat, root, &cur_center, threads);
208276
if(!assignment_changed) {
209277
break;
210278
}
211279
}
280+
281+
// set indices for reference nodes
282+
partition_elements(root_of(nid), left_of(nid), right_of(nid));
283+
284+
// perform inference on all elements
285+
if(cur_sample_rate < 1.0) {
286+
do_assignment(&feat_mat, root_of(nid), &cur_center, threads);
287+
}
212288
}
213289

214290
template<typename MAT>
215-
void partition_skmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0) {
216-
Node& root = root_of(nid);
217-
Node& left = left_of(nid);
218-
Node& right = right_of(nid);
291+
void partition_skmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0, float cur_sample_rate=1.0) {
292+
// copy nodes rather than reference for sampling
293+
Node root = root_of(nid);
294+
Node left = left_of(nid);
295+
Node right = right_of(nid);
296+
297+
// modify nodes' start and end based on cur_sample_rate
298+
if(cur_sample_rate < 1.0) {
299+
sample_elements(root, rng, cur_sample_rate);
300+
}
219301
partition_elements(root, left, right);
220302

221303
f32_sdvec_t& cur_center1 = center1[thread_id];
@@ -253,35 +335,23 @@ struct Tree {
253335

254336
do_axpy(-1.0, cur_center2, cur_center1);
255337
}
256-
257-
258-
u64_dvec_t *elements_ptr = &elements;
259-
auto *scores_ptr = &scores;
260-
auto *center_ptr = &cur_center1;
261-
const MAT* feat_mat_ptr = &feat_mat;
262-
if(threads == 1) {
263-
for(size_t i = root.start; i < root.end; i++) {
264-
size_t eid = elements_ptr->at(i);
265-
const auto& feat = feat_mat_ptr->get_row(eid);
266-
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
267-
}
268-
} else {
269-
#pragma omp parallel for shared(elements_ptr, scores_ptr, center_ptr, feat_mat_ptr)
270-
for(size_t i = root.start; i < root.end; i++) {
271-
size_t eid = elements_ptr->at(i);
272-
const auto& feat = feat_mat_ptr->get_row(eid);
273-
scores_ptr->at(eid) = do_dot_product(*center_ptr, feat);
274-
}
275-
}
276-
bool assignment_changed = sort_elements_by_scores_on_node(root, threads);
338+
bool assignment_changed = do_assignment(&feat_mat, root, &cur_center1, threads);
277339
if(!assignment_changed) {
278340
break;
279341
}
280342
}
343+
344+
// set indices for reference nodes
345+
partition_elements(root_of(nid), left_of(nid), right_of(nid));
346+
347+
// perform inference on all elements
348+
if(cur_sample_rate < 1.0) {
349+
do_assignment(&feat_mat, root_of(nid), &cur_center1, threads);
350+
}
281351
}
282352

283353
template<typename MAT, typename IND=unsigned>
284-
void run_clustering(const MAT& feat_mat, int partition_algo, int seed=0, IND *label_codes=NULL, size_t max_iter=10, int threads=1) {
354+
void run_clustering(const MAT& feat_mat, int partition_algo, int seed=0, IND *label_codes=NULL, size_t max_iter=10, int threads=1, ClusterSamplerParam* sample_param_ptr=NULL) {
285355
size_t nr_elements = feat_mat.rows;
286356
elements.resize(nr_elements);
287357
previous_elements.resize(nr_elements);
@@ -303,21 +373,26 @@ struct Tree {
303373
// Allocate tmp arrays for parallel update center
304374
center_tmp_thread.resize(threads, f32_sdvec_t(feat_mat.cols));
305375

376+
if(sample_param_ptr == NULL) {
377+
sample_param_ptr = new ClusterSamplerParam(CONSTANT_SAMPLE_SCHEDULE, 1.0, 1.0, 1.0); // no sampling for default constructor
378+
}
379+
SampleScheduler sample_scheduler(sample_param_ptr, depth);
306380

307381
// let's do it layer by layer so we can parallelize it
308382
for(size_t d = 0; d < depth; d++) {
309383
size_t layer_start = 1U << d;
310384
size_t layer_end = 1U << (d + 1);
385+
float cur_sample_rate = sample_scheduler.get_sample_rate(d);
311386
if((layer_end - layer_start) >= (size_t) threads) {
312387
#pragma omp parallel for schedule(dynamic)
313388
for(size_t nid = layer_start; nid < layer_end; nid++) {
314389
rng_t rng(seed_for_nodes[nid]);
315390
int local_threads = 1;
316391
int thread_id = omp_get_thread_num();
317392
if(partition_algo == KMEANS) {
318-
partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id);
393+
partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate);
319394
} else if(partition_algo == SKMEANS) {
320-
partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id);
395+
partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate);
321396
}
322397
}
323398
} else {
@@ -326,9 +401,9 @@ struct Tree {
326401
int local_threads = threads;
327402
int thread_id = 0;
328403
if(partition_algo == KMEANS) {
329-
partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id);
404+
partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate);
330405
} else if(partition_algo == SKMEANS) {
331-
partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id);
406+
partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate);
332407
}
333408
}
334409
}
@@ -365,4 +440,4 @@ struct Tree {
365440
} // end of namespace clustering
366441
} // end of namespace pecos
367442

368-
#endif // end of __CLUSTERING_H__
443+
#endif // end of __CLUSTERING_H__

0 commit comments

Comments
 (0)