Skip to content

Commit

Permalink
refactor interface for hierarchical clustering and sampling
Browse files Browse the repository at this point in the history
  • Loading branch information
yaushian committed Dec 14, 2022
1 parent aad94c6 commit c2d8b0c
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 93 deletions.
11 changes: 5 additions & 6 deletions pecos/core/ann/quantizer_impl/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,13 @@ namespace ann {

// fit HLT or flat-Kmeans for each sub-space
std::vector<index_type> assignments(sub_sample_points);
pecos::clustering::Tree hlt(std::log2(num_of_local_centroids));
int hlt_depth = std::log2(num_of_local_centroids);
pecos::clustering::Tree hlt(hlt_depth);
pecos::clustering::ClusteringParam clustering_param(0, hlt_depth, seed, max_iter, threads, 1.0, 1.0, 1.0);
hlt.run_clustering<pecos::drm_t, index_type>(
Xsub,
0,
seed,
assignments.data(),
max_iter,
threads);
&clustering_param,
assignments.data());

compute_centroids(Xsub, local_dimension, num_of_local_centroids, assignments.data(),
&original_local_codebooks[m * num_of_local_centroids * local_dimension], threads);
Expand Down
41 changes: 26 additions & 15 deletions pecos/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,28 @@ def __init__(self, base_vect_param_list, norm_p):
self.norm_p = c_int32(norm_p)


class ClusteringParam(ctypes.Structure):
"""
python class for handling struct ClusteringParam in clustering.hpp
"""

_fields_ = [
("partition_algo", c_int),
("depth", c_int),
("seed", c_int),
("kmeans_max_iter", c_int),
("threads", c_int),
("max_sample_rate", c_float),
("min_sample_rate", c_float),
("warmup_ratio", c_float),
]

def __init__(self, params):
name2type = dict(ClusteringParam._fields_)
for name in name2type:
setattr(self, name, name2type[name](getattr(params, name)))


class ScipyCscF32(ctypes.Structure):
"""
PyMatrix for scipy.sparse.csc_matrix
Expand Down Expand Up @@ -1250,11 +1272,7 @@ def link_clustering(self):
"""
arg_list = [
POINTER(ScipyCsrF32),
c_uint32,
c_uint32,
c_int,
c_uint32,
c_int,
POINTER(ClusteringParam),
POINTER(c_uint32),
]
corelib.fillprototype(
Expand All @@ -1267,12 +1285,8 @@ def link_clustering(self):
def run_clustering(
self,
py_feat_mat,
depth,
algo,
seed,
train_params,
codes=None,
kmeans_max_iter=20,
threads=-1,
):
"""
Run clustering with given label embedding matrix and parameters in C++.
Expand Down Expand Up @@ -1302,13 +1316,10 @@ def run_clustering(

if codes is None or len(codes) != py_feat_mat.shape[0] or codes.dtype != np.uint32:
codes = np.zeros(py_feat_mat.rows, dtype=np.uint32)
train_params = ClusteringParam(train_params)
run_clustering(
byref(py_feat_mat),
depth,
algo,
seed,
kmeans_max_iter,
threads,
train_params,
codes.ctypes.data_as(POINTER(c_uint32)),
)
return codes
Expand Down
10 changes: 3 additions & 7 deletions pecos/core/libpecos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,11 @@ extern "C" {
#define C_RUN_CLUSTERING(SUFFIX, PY_MAT, C_MAT) \
void c_run_clustering ## SUFFIX( \
const PY_MAT* py_mat_ptr, \
uint32_t depth, \
uint32_t partition_algo, \
int seed, \
uint32_t max_iter, \
int threads, \
pecos::clustering::ClusteringParam* param_ptr, \
uint32_t* label_codes) { \
C_MAT feat_mat(py_mat_ptr); \
pecos::clustering::Tree tree(depth); \
tree.run_clustering(feat_mat, partition_algo, seed, label_codes, max_iter, threads); \
pecos::clustering::Tree tree(param_ptr->depth); \
tree.run_clustering(feat_mat, param_ptr, label_codes); \
}
C_RUN_CLUSTERING(_csr_f32, ScipyCsrF32, pecos::csr_t)
C_RUN_CLUSTERING(_drm_f32, ScipyDrmF32, pecos::drm_t)
Expand Down
112 changes: 54 additions & 58 deletions pecos/core/utils/clustering.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,42 @@ enum {
SKMEANS=5,
}; /* partition_algo */

enum {
CONSTANT_SAMPLE_SCHEDULE=0,
LINEAR_SAMPLE_SCHEDULE=1,
}; /* sample strategies */

extern "C" {
struct ClusteringSamplerParam {
int strategy;
float sample_rate;
float warmup_sample_rate;
float warmup_layer_rate;

ClusteringSamplerParam(
int strategy,
float sample_rate,
float warmup_sample_rate,
float warmup_layer_rate
): strategy(strategy),
sample_rate(sample_rate),
warmup_sample_rate(warmup_sample_rate),
warmup_layer_rate(warmup_layer_rate) {
if(sample_rate <= 0 || sample_rate > 1) {
throw std::invalid_argument("expect 0 < sample_rate <= 1.0");
struct ClusteringParam {
int partition_algo;
int depth;
int seed;
int kmeans_max_iter;
int threads;
float max_sample_rate;
float min_sample_rate;
float warmup_ratio;

ClusteringParam(
int partition_algo,
int depth,
int seed,
int kmeans_max_iter,
int threads,
float max_sample_rate,
float min_sample_rate,
float warmup_ratio
): partition_algo(partition_algo),
depth(depth),
seed(seed),
kmeans_max_iter(kmeans_max_iter),
threads(threads),
max_sample_rate(max_sample_rate),
min_sample_rate(min_sample_rate),
warmup_ratio(warmup_ratio) {
if(min_sample_rate <= 0 || min_sample_rate > 1) {
throw std::invalid_argument("expect 0 < min_sample_rate <= 1.0");
}
if(warmup_sample_rate <= 0 || warmup_sample_rate > 1) {
throw std::invalid_argument("expect 0 < warmup_sample_rate <= 1.0");
if(max_sample_rate <= 0 || max_sample_rate > 1) {
throw std::invalid_argument("expect 0 < max_sample_rate <= 1.0");
}
if(warmup_layer_rate < 0 || warmup_layer_rate > 1) {
throw std::invalid_argument("expect 0 <= warmup_layer_rate <= 1.0");
if(warmup_ratio < 0 || warmup_ratio > 1) {
throw std::invalid_argument("expect 0 <= warmup_ratio <= 1.0");
}
}
};
Expand Down Expand Up @@ -113,28 +120,20 @@ struct Tree {

struct ClusteringSampler {
// scheduler for sampling
ClusteringSamplerParam* param_ptr;
ClusteringParam* param_ptr;
size_t warmup_layers;
size_t depth;

ClusteringSampler(ClusteringSamplerParam* param_ptr, size_t depth): param_ptr(param_ptr), depth(depth) {
warmup_layers = size_t(depth * param_ptr->warmup_layer_rate);
ClusteringSampler(ClusteringParam* param_ptr): param_ptr(param_ptr) {
warmup_layers = size_t(param_ptr->depth * param_ptr->warmup_ratio);
}

float get_sample_rate(size_t layer) const {
if(param_ptr->strategy == LINEAR_SAMPLE_SCHEDULE) {
return _get_linear_sample_rate(layer);
}
return param_ptr->sample_rate; // Constant strategy
}

float _get_linear_sample_rate(size_t layer) const {
// If input `layer` < `warmup_layers`, return `warmup_sample_rate`.
// Otherwise, linearly increase the current sample rate from `warmup_sample_rate` to `sample_rate` until the last layer.
if(layer < warmup_layers) {
return param_ptr->warmup_sample_rate;
return param_ptr->min_sample_rate;
}
return param_ptr->warmup_sample_rate + (param_ptr->sample_rate - param_ptr->warmup_sample_rate) * (layer + 1 - warmup_layers) / (depth - warmup_layers);
return param_ptr->min_sample_rate + (param_ptr->max_sample_rate - param_ptr->min_sample_rate) * (layer + 1 - warmup_layers) / (param_ptr->depth - warmup_layers);
}
};

Expand Down Expand Up @@ -244,7 +243,7 @@ struct Tree {
}

template<typename MAT>
void partition_kmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0, float cur_sample_rate=1.0) {
void partition_kmeans(size_t nid, int depth, const MAT& feat_mat, rng_t& rng, int max_iter=10, int threads=1, int thread_id=0, float cur_sample_rate=1.0) {
// copy nodes rather than reference for sampling
Node root = root_of(nid);
Node left = left_of(nid);
Expand All @@ -259,7 +258,7 @@ struct Tree {
f32_sdvec_t& cur_center = center1[thread_id];

// perform the clustering and sorting
for(size_t iter = 0; iter < max_iter; iter++) {
for(int iter = 0; iter < max_iter; iter++) {
// construct cur_center (for right child)
cur_center.fill_zeros();
if(iter == 0) {
Expand Down Expand Up @@ -297,7 +296,7 @@ struct Tree {
}

template<typename MAT>
void partition_skmeans(size_t nid, size_t depth, const MAT& feat_mat, rng_t& rng, size_t max_iter=10, int threads=1, int thread_id=0, float cur_sample_rate=1.0) {
void partition_skmeans(size_t nid, int depth, const MAT& feat_mat, rng_t& rng, int max_iter=10, int threads=1, int thread_id=0, float cur_sample_rate=1.0) {
// copy nodes rather than reference for sampling
Node root = root_of(nid);
Node left = left_of(nid);
Expand All @@ -313,7 +312,7 @@ struct Tree {
f32_sdvec_t& cur_center2 = center2[thread_id];

// perform the clustering and sorting
for(size_t iter = 0; iter < max_iter; iter++) {
for(int iter = 0; iter < max_iter; iter++) {
float32_t one = 1.0;
// construct center1 (for right child)
cur_center1.fill_zeros();
Expand Down Expand Up @@ -360,19 +359,19 @@ struct Tree {
}

template<typename MAT, typename IND=unsigned>
void run_clustering(const MAT& feat_mat, int partition_algo, int seed=0, IND *label_codes=NULL, size_t max_iter=10, int threads=1, ClusteringSamplerParam* sample_param_ptr=NULL) {
void run_clustering(const MAT& feat_mat, ClusteringParam* param_ptr=NULL, IND *label_codes=NULL) {
size_t nr_elements = feat_mat.rows;
elements.resize(nr_elements);
previous_elements.resize(nr_elements);
for(size_t i = 0; i < nr_elements; i++) {
elements[i] = i;
}
rng_t rng(seed);
rng_t rng(param_ptr->seed);
for(size_t nid = 0; nid < nodes.size(); nid++) {
seed_for_nodes[nid] = rng.randint<unsigned>();
}

threads = set_threads(threads);
int threads = set_threads(param_ptr->threads);
center1.resize(threads, f32_sdvec_t(feat_mat.cols));
center2.resize(threads, f32_sdvec_t(feat_mat.cols));
scores.resize(feat_mat.rows, 0);
Expand All @@ -382,10 +381,7 @@ struct Tree {
// Allocate tmp arrays for parallel update center
center_tmp_thread.resize(threads, f32_sdvec_t(feat_mat.cols));

if(sample_param_ptr == NULL) {
sample_param_ptr = new ClusteringSamplerParam(CONSTANT_SAMPLE_SCHEDULE, 1.0, 1.0, 1.0); // no sampling for default constructor
}
ClusteringSampler sample_scheduler(sample_param_ptr, depth);
ClusteringSampler sample_scheduler(param_ptr);

// let's do it layer by layer so we can parallelize it
for(size_t d = 0; d < depth; d++) {
Expand All @@ -398,21 +394,21 @@ struct Tree {
rng_t rng(seed_for_nodes[nid]);
int local_threads = 1;
int thread_id = omp_get_thread_num();
if(partition_algo == KMEANS) {
partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate);
} else if(partition_algo == SKMEANS) {
partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate);
if(param_ptr->partition_algo == KMEANS) {
partition_kmeans(nid, d, feat_mat, rng, param_ptr->kmeans_max_iter, local_threads, thread_id, cur_sample_rate);
} else if(param_ptr->partition_algo == SKMEANS) {
partition_skmeans(nid, d, feat_mat, rng, param_ptr->kmeans_max_iter, local_threads, thread_id, cur_sample_rate);
}
}
} else {
for(size_t nid = layer_start; nid < layer_end; nid++) {
rng_t rng(seed_for_nodes[nid]);
int local_threads = threads;
int thread_id = 0;
if(partition_algo == KMEANS) {
partition_kmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate);
} else if(partition_algo == SKMEANS) {
partition_skmeans(nid, d, feat_mat, rng, max_iter, local_threads, thread_id, cur_sample_rate);
if(param_ptr->partition_algo == KMEANS) {
partition_kmeans(nid, d, feat_mat, rng, param_ptr->kmeans_max_iter, local_threads, thread_id, cur_sample_rate);
} else if(param_ptr->partition_algo == SKMEANS) {
partition_skmeans(nid, d, feat_mat, rng, param_ptr->kmeans_max_iter, local_threads, thread_id, cur_sample_rate);
}
}
}
Expand Down
28 changes: 21 additions & 7 deletions pecos/xmc/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class TrainParams(pecos.BaseParams): # type: ignore
seed (int, optional): Random seed. Default is `0`.
kmeans_max_iter (int, optional): Maximum number of iterations for each k-means problem. Default is `20`.
threads (int, optional): Number of threads to use. `-1` denotes all CPUs. Default is `-1`.
do_sample (bool, optional): Do sampling if is True. Default is False.
We use linear sampling strategy with warmup, which linearly increases sampling rate from `min_sample_rate` to `max_sample_rate`.
The top (total_layer * `warmup_ratio`) layers are warmup_layers which use a fixed sampling rate `min_sample_rate`.
The sampling rate for layer l is `min_sample_rate`+max(l+1-warmup_layer,0)*(`max_sample_rate`-min_sample_rate)/(total_layers-warmup_layers).
"""

nr_splits: int = 16
Expand All @@ -103,6 +107,12 @@ class TrainParams(pecos.BaseParams): # type: ignore
kmeans_max_iter: int = 20
threads: int = -1

# paramters for sampling of hierarchical clustering
do_sample: bool = False
max_sample_rate: float = 1.0
min_sample_rate: float = 0.1
warmup_ratio: float = 0.4

@classmethod
def gen(
cls,
Expand All @@ -117,6 +127,7 @@ def gen(
feat_mat (numpy.ndarray or scipy.sparse.csr.csr_matrix): Matrix of label features.
train_params (HierarchicalKMeans.TrainParams, optional): training parameters for indexing.
dtype (type, optional): Data type for matrices. Default is `numpy.float32`.
sample_params (HierarchicalKMeans.SampleParams, optional): parameters for sampling. None denotes no sampling. Default is None.
**kwargs: Ignored.
Returns:
Expand All @@ -130,6 +141,11 @@ def gen(
if train_params.min_codes is None:
train_params.min_codes = train_params.nr_splits

if not train_params.do_sample:
# set the min_sample_rate to be 1.0 so it doesn't do sampling
train_params.warmup_ratio = 1.0
train_params.min_sample_rate = 1.0

LOGGER.debug(
f"HierarchicalKMeans train_params: {json.dumps(train_params.to_dict(), indent=True)}"
)
Expand All @@ -146,8 +162,10 @@ def gen(
raise ValueError(
f"max_leaf_size > 1 is needed for feat_mat.shape[0] == {nr_instances} to avoid empty clusters"
)
train_params.depth = depth

algo = cls.SKMEANS if train_params.spherical else cls.KMEANS
partition_algo = cls.SKMEANS if train_params.spherical else cls.KMEANS
train_params.partition_algo = partition_algo

assert feat_mat.dtype == np.float32
if isinstance(feat_mat, (smat.csr_matrix, ScipyCsrF32)):
Expand All @@ -162,12 +180,8 @@ def gen(
codes = np.zeros(py_feat_mat.rows, dtype=np.uint32)
codes = clib.run_clustering(
py_feat_mat,
depth,
algo,
train_params.seed,
codes=codes,
kmeans_max_iter=train_params.kmeans_max_iter,
threads=train_params.threads,
train_params,
codes,
)
C = cls.convert_codes_to_csc_matrix(codes, depth)
cluster_chain = ClusterChain.from_partial_chain(
Expand Down
Loading

0 comments on commit c2d8b0c

Please sign in to comment.