diff --git a/egs/callhome_diarization/v1/diarization/cluster.sh b/egs/callhome_diarization/v1/diarization/cluster.sh index fa5ead5b6b9..5e5c6e9dbe5 100755 --- a/egs/callhome_diarization/v1/diarization/cluster.sh +++ b/egs/callhome_diarization/v1/diarization/cluster.sh @@ -14,6 +14,8 @@ stage=0 nj=10 cleanup=true threshold=0.5 +max_spk_fraction=1.0 +first_pass_max_utterances=32767 rttm_channel=0 read_costs=false reco2num_spk= @@ -36,6 +38,15 @@ if [ $# != 2 ]; then echo " --threshold # Cluster stopping criterion. Clusters with scores greater" echo " # than this value will be merged until all clusters" echo " # exceed this value." + echo " --max-spk-fraction # Clusters with total fraction of utterances greater than" + echo " # this value will not be merged. This is active only when" + echo " # reco2num-spk is supplied and" + echo " # 1.0 / num-spk <= max-spk-fraction <= 1.0." + echo " --first-pass-max-utterances # If the number of utterances is larger than first-pass-max-utterances," + echo " # then clustering is done in two passes. In the first pass, input points" + echo " # are divided into contiguous subsets of size first-pass-max-utterances" + echo " # and each subset is clustered separately. In the second pass, the first" + echo " # pass clusters are merged into the final set of clusters." echo " --rttm-channel # The value passed into the RTTM channel field. Only affects" echo " # the format of the RTTM file." echo " --read-costs # If true, interpret input scores as costs, i.e. similarity" @@ -78,8 +89,10 @@ if [ $stage -le 0 ]; then echo "$0: clustering scores" $cmd JOB=1:$nj $dir/log/agglomerative_cluster.JOB.log \ agglomerative-cluster --threshold=$threshold --read-costs=$read_costs \ - --reco2num-spk-rspecifier=$reco2num_spk scp:"$feats" \ - ark,t:$sdata/JOB/spk2utt ark,t:$dir/labels.JOB || exit 1; + --reco2num-spk-rspecifier=$reco2num_spk \ + --max-spk-fraction=$max_spk_fraction \ + --first-pass-max-utterances=$first_pass_max_utterances \ + scp:"$feats" ark,t:$sdata/JOB/spk2utt ark,t:$dir/labels.JOB || exit 1; fi if [ $stage -le 1 ]; then diff --git a/src/ivector/agglomerative-clustering.cc b/src/ivector/agglomerative-clustering.cc index 30138e00637..ced912ed195 100644 --- a/src/ivector/agglomerative-clustering.cc +++ b/src/ivector/agglomerative-clustering.cc @@ -2,6 +2,7 @@ // Copyright 2017-2018 Matthew Maciejewski // 2018 David Snyder +// 2019 Dogan Can // See ../../COPYING for clarification regarding multiple authors // @@ -24,65 +25,98 @@ namespace kaldi { void AgglomerativeClusterer::Cluster() { - KALDI_VLOG(2) << "Initializing cluster assignments."; - Initialize(); - - KALDI_VLOG(2) << "Clustering..."; - // This is the main algorithm loop. It moves through the queue merging - // clusters until a stopping criterion has been reached. - while (num_clusters_ > min_clust_ && !queue_.empty()) { - std::pair > pr = queue_.top(); - int32 i = (int32) pr.second.first, j = (int32) pr.second.second; - queue_.pop(); - // check to make sure clusters have not already been merged - if ((active_clusters_.find(i) != active_clusters_.end()) && - (active_clusters_.find(j) != active_clusters_.end())) - MergeClusters(i, j); - } + if (num_points_ > first_pass_max_points_) + ClusterTwoPass(); + else + ClusterSinglePass(); +} - std::vector new_assignments(num_points_); - int32 label_id = 0; - std::set::iterator it; - // Iterate through the clusters and assign all utterances within the cluster - // an ID label unique to the cluster. This is the final output and frees up - // the cluster memory accordingly. - for (it = active_clusters_.begin(); it != active_clusters_.end(); ++it) { - ++label_id; - AhcCluster *cluster = clusters_map_[*it]; - std::vector::iterator utt_it; - for (utt_it = cluster->utt_ids.begin(); - utt_it != cluster->utt_ids.end(); ++utt_it) - new_assignments[*utt_it] = label_id; - delete cluster; +void AgglomerativeClusterer::ClusterSinglePass() { + InitializeClusters(0, num_points_); + ComputeClusters(min_clusters_); + AssignClusters(); +} + +void AgglomerativeClusterer::ClusterTwoPass() { + // This is the first pass loop. We divide the input into equal size subsets + // making sure each subset has at most first_pass_max_points_ points. Then, we + // cluster the points in each subset separately until a stopping criterion is + // reached. We set the minimum number of clusters to 10 * min_clusters_ for + // each subset to avoid early merging of most clusters that would otherwise be + // kept separate in single pass clustering. + BaseFloat num_points = static_cast(num_points_); + int32 num_subsets = ceil(num_points / first_pass_max_points_); + int32 subset_size = ceil(num_points / num_subsets); + for (int32 n = 0; n < num_points_; n += subset_size) { + InitializeClusters(n, std::min(n + subset_size, num_points_)); + ComputeClusters(min_clusters_ * 10); + AddClustersToSecondPass(); } - assignments_->swap(new_assignments); + + // We swap the contents of the first and second pass data structures so that + // we can use the same method to do second pass clustering. + clusters_map_.swap(second_pass_clusters_map_); + active_clusters_.swap(second_pass_active_clusters_); + cluster_cost_map_.swap(second_pass_cluster_cost_map_); + queue_.swap(second_pass_queue_); + count_ = second_pass_count_; + + // This is the second pass. It moves through the queue merging clusters + // determined in the first pass until a stopping criterion is reached. + ComputeClusters(min_clusters_); + + AssignClusters(); } -BaseFloat AgglomerativeClusterer::GetCost(int32 i, int32 j) { +uint32 AgglomerativeClusterer::EncodePair(int32 i, int32 j) { if (i < j) - return cluster_cost_map_[std::make_pair(i, j)]; + return (static_cast(i) << 16) + static_cast(j); else - return cluster_cost_map_[std::make_pair(j, i)]; + return (static_cast(j) << 16) + static_cast(i); +} + +std::pair AgglomerativeClusterer::DecodePair(uint32 key) { + return std::make_pair(static_cast(key >> 16), + static_cast(key & 0x0000FFFFu)); } -void AgglomerativeClusterer::Initialize() { - KALDI_ASSERT(num_clusters_ != 0); - for (int32 i = 0; i < num_points_; i++) { +void AgglomerativeClusterer::InitializeClusters(int32 first, int32 last) { + KALDI_ASSERT(last > first); + clusters_map_.clear(); + active_clusters_.clear(); + cluster_cost_map_.clear(); + queue_ = QueueType(); // priority_queue does not have a clear method + + for (int32 i = first; i < last; i++) { // create an initial cluster of size 1 for each point std::vector ids; ids.push_back(i); - AhcCluster *c = new AhcCluster(++count_, -1, -1, ids); - clusters_map_[count_] = c; - active_clusters_.insert(count_); + AhcCluster *c = new AhcCluster(i + 1, -1, -1, ids); + clusters_map_[i + 1] = c; + active_clusters_.insert(i + 1); // propagate the queue with all pairs from the cost matrix - for (int32 j = i+1; j < num_clusters_; j++) { - BaseFloat cost = costs_(i,j); - cluster_cost_map_[std::make_pair(i+1, j+1)] = cost; - if (cost <= thresh_) - queue_.push(std::make_pair(cost, - std::make_pair(static_cast(i+1), - static_cast(j+1)))); + for (int32 j = i + 1; j < last; j++) { + BaseFloat cost = costs_(i, j); + uint32 key = EncodePair(i + 1, j + 1); + cluster_cost_map_[key] = cost; + if (cost <= threshold_) + queue_.push(std::make_pair(cost, key)); + } + } +} + +void AgglomerativeClusterer::ComputeClusters(int32 min_clusters) { + while (active_clusters_.size() > min_clusters && !queue_.empty()) { + std::pair pr = queue_.top(); + int32 i, j; + std::tie(i, j) = DecodePair(pr.second); + queue_.pop(); + // check to make sure clusters have not already been merged + if ((active_clusters_.find(i) != active_clusters_.end()) && + (active_clusters_.find(j) != active_clusters_.end())) { + if (clusters_map_[i]->size + clusters_map_[j]->size <= max_cluster_size_) + MergeClusters(i, j); } } } @@ -105,27 +139,99 @@ void AgglomerativeClusterer::MergeClusters(int32 i, int32 j) { std::set::iterator it; for (it = active_clusters_.begin(); it != active_clusters_.end(); ++it) { // The new cost is the sum of the costs of the new cluster's parents - BaseFloat new_cost = GetCost(*it, i) + GetCost(*it, j); - cluster_cost_map_[std::make_pair(*it, count_)] = new_cost; + BaseFloat new_cost = cluster_cost_map_[EncodePair(*it, i)] + + cluster_cost_map_[EncodePair(*it, j)]; + uint32 new_key = EncodePair(*it, count_); + cluster_cost_map_[new_key] = new_cost; BaseFloat norm = clust1->size * (clusters_map_[*it])->size; - if (new_cost / norm <= thresh_) - queue_.push(std::make_pair(new_cost / norm, - std::make_pair(static_cast(*it), - static_cast(count_)))); + if (new_cost / norm <= threshold_) + queue_.push(std::make_pair(new_cost / norm, new_key)); } active_clusters_.insert(count_); clusters_map_[count_] = clust1; delete clust2; - num_clusters_--; +} + +void AgglomerativeClusterer::AddClustersToSecondPass() { + // This method collects the results of first pass clustering for one subset, + // i.e. adds the set of active clusters to the set of second pass active + // clusters and computes the costs for the newly formed cluster pairs. + std::set::iterator it1, it2; + int32 count = second_pass_count_; + for (it1 = active_clusters_.begin(); it1 != active_clusters_.end(); ++it1) { + AhcCluster *clust1 = clusters_map_[*it1]; + second_pass_clusters_map_[++count] = clust1; + + // Compute new cluster pair costs + for (it2 = second_pass_active_clusters_.begin(); + it2 != second_pass_active_clusters_.end(); ++it2) { + AhcCluster *clust2 = second_pass_clusters_map_[*it2]; + uint32 new_key = EncodePair(count, *it2); + + BaseFloat new_cost = 0.0; + std::vector::iterator utt_it1, utt_it2; + for (utt_it1 = clust1->utt_ids.begin(); + utt_it1 != clust1->utt_ids.end(); ++utt_it1) { + for (utt_it2 = clust2->utt_ids.begin(); + utt_it2 != clust2->utt_ids.end(); ++utt_it2) { + new_cost += costs_(*utt_it1, *utt_it2); + } + } + + second_pass_cluster_cost_map_[new_key] = new_cost; + BaseFloat norm = clust1->size * clust2->size; + if (new_cost / norm <= threshold_) + second_pass_queue_.push(std::make_pair(new_cost / norm, new_key)); + } + + // Copy cluster pair costs that were already computed in the first pass + int32 count2 = second_pass_count_; + for (it2 = active_clusters_.begin(); it2 != it1; ++it2) { + uint32 key = EncodePair(*it1, *it2); + BaseFloat cost = cluster_cost_map_[key]; + BaseFloat norm = clust1->size * (clusters_map_[*it2])->size; + uint32 new_key = EncodePair(count, ++count2); + second_pass_cluster_cost_map_[new_key] = cost; + if (cost / norm <= threshold_) + second_pass_queue_.push(std::make_pair(cost / norm, new_key)); + } + } + // We update second_pass_count_ and second_pass_active_clusters_ here since + // above loop assumes they do not change while the loop is running. + while (second_pass_count_ < count) + second_pass_active_clusters_.insert(++second_pass_count_); +} + +void AgglomerativeClusterer::AssignClusters() { + assignments_->resize(num_points_); + int32 label_id = 0; + std::set::iterator it; + // Iterate through the clusters and assign all utterances within the cluster + // an ID label unique to the cluster. This is the final output and frees up + // the cluster memory accordingly. + for (it = active_clusters_.begin(); it != active_clusters_.end(); ++it) { + ++label_id; + AhcCluster *cluster = clusters_map_[*it]; + std::vector::iterator utt_it; + for (utt_it = cluster->utt_ids.begin(); + utt_it != cluster->utt_ids.end(); ++utt_it) + (*assignments_)[*utt_it] = label_id; + delete cluster; + } } void AgglomerativeCluster( const Matrix &costs, - BaseFloat thresh, - int32 min_clust, + BaseFloat threshold, + int32 min_clusters, + int32 first_pass_max_points, + BaseFloat max_cluster_fraction, std::vector *assignments_out) { - KALDI_ASSERT(min_clust >= 0); - AgglomerativeClusterer ac(costs, thresh, min_clust, assignments_out); + KALDI_ASSERT(min_clusters >= 0); + KALDI_ASSERT(max_cluster_fraction >= 1.0 / min_clusters); + AgglomerativeClusterer ac(costs, threshold, min_clusters, + first_pass_max_points, max_cluster_fraction, + assignments_out); ac.Cluster(); } diff --git a/src/ivector/agglomerative-clustering.h b/src/ivector/agglomerative-clustering.h index 310a336f8b5..ffd63a86e29 100644 --- a/src/ivector/agglomerative-clustering.h +++ b/src/ivector/agglomerative-clustering.h @@ -2,6 +2,7 @@ // Copyright 2017-2018 Matthew Maciejewski // 2018 David Snyder +// 2019 Dogan Can // See ../../COPYING for clarification regarding multiple authors // @@ -55,65 +56,108 @@ class AgglomerativeClusterer { public: AgglomerativeClusterer( const Matrix &costs, - BaseFloat thresh, - int32 min_clust, + BaseFloat threshold, + int32 min_clusters, + int32 first_pass_max_points, + BaseFloat max_cluster_fraction, std::vector *assignments_out) - : count_(0), costs_(costs), thresh_(thresh), min_clust_(min_clust), + : costs_(costs), threshold_(threshold), min_clusters_(min_clusters), + first_pass_max_points_(first_pass_max_points), assignments_(assignments_out) { - num_clusters_ = costs.NumRows(); num_points_ = costs.NumRows(); + + // The max_cluster_size_ is a hard limit on the number points in a cluster. + // This is useful for handling degenerate cases where some outlier points + // form their own clusters and force everything else to be clustered + // together, e.g. when min-clusters is provided instead of a threshold. + max_cluster_size_ = ceil(num_points_ * max_cluster_fraction); + + // The count_, which is used for identifying clusters, is initialized to + // num_points_ because cluster IDs 1..num_points_ are reserved for input + // points, which are the initial set of clusters. + count_ = num_points_; + + // The second_pass_count_, which is used for identifying the initial set of + // second pass clusters and initializing count_ before the second pass, is + // initialized to 0 and incremented whenever a new cluster is added to the + // initial set of second pass clusters. + second_pass_count_ = 0; } - // Performs the clustering + // Clusters points. Chooses single pass or two pass algorithm. void Cluster(); + + // Clusters points using single pass algorithm. + void ClusterSinglePass(); + + // Clusters points using two pass algorithm. + void ClusterTwoPass(); + private: - // Returns the cost between clusters with IDs i and j - BaseFloat GetCost(int32 i, int32 j); + // Encodes cluster pair into a 32bit unsigned integer. + uint32 EncodePair(int32 i, int32 j); + // Decodes cluster pair from a 32bit unsigned integer. + std::pair DecodePair(uint32 key); // Initializes the clustering queue with singleton clusters - void Initialize(); + void InitializeClusters(int32 first, int32 last); + // Does hierarchical agglomerative clustering + void ComputeClusters(int32 min_clusters); + // Adds clusters created in first pass to second pass clusters + void AddClustersToSecondPass(); + // Assigns points to clusters + void AssignClusters(); // Merges clusters with IDs i and j and updates cost map and queue void MergeClusters(int32 i, int32 j); - - int32 count_; // Count of clusters that have been created. Also used to give - // clusters unique IDs. const Matrix &costs_; // cost matrix - BaseFloat thresh_; // stopping criterion threshold - int32 min_clust_; // minimum number of clusters + BaseFloat threshold_; // stopping criterion threshold + int32 min_clusters_; // minimum number of clusters + int32 first_pass_max_points_; // maximum number of points in each subset std::vector *assignments_; // assignments out + int32 num_points_; // total number of points to cluster + int32 max_cluster_size_; // maximum number of points in a cluster + int32 count_; // count of first pass clusters, used for identifying clusters + int32 second_pass_count_; // count of second pass clusters + // Priority queue using greater (lowest costs are highest priority). // Elements contain pairs of cluster IDs and their cost. - typedef std::pair > QueueElement; + typedef std::pair QueueElement; typedef std::priority_queue, std::greater > QueueType; - QueueType queue_; + QueueType queue_, second_pass_queue_; // Map from cluster IDs to cost between them - std::unordered_map, BaseFloat, - PairHasher> cluster_cost_map_; + std::unordered_map cluster_cost_map_; // Map from cluster ID to cluster object address std::unordered_map clusters_map_; - std::set active_clusters_; // IDs of unmerged clusters - int32 num_clusters_; // number of active clusters - int32 num_points_; // total number of points to cluster + // Set of unmerged cluster IDs + std::set active_clusters_; + + // Map from second pass cluster IDs to cost between them + std::unordered_map second_pass_cluster_cost_map_; + // Map from second pass cluster ID to cluster object address + std::unordered_map second_pass_clusters_map_; + // Set of unmerged second pass cluster IDs + std::set second_pass_active_clusters_; }; /** This is the function that is called to perform the agglomerative * clustering. It takes the following arguments: * - A matrix of all pairwise costs, with each row/column corresponding * to an utterance ID, and the elements of the matrix containing the - cost for pairing the utterances for its row and column + * cost for pairing the utterances for its row and column * - A threshold which is used as the stopping criterion for the clusters * - A minimum number of clusters that will not be merged past + * - A maximum fraction of points that can be in a cluster * - A vector which will be filled with integer IDs corresponding to each * of the rows/columns of the score matrix. * * The basic algorithm is as follows: * \code - * while (num-clusters > min_clust && smallest-merge-cost <= thresh) - * merge the two clusters with lowest cost. + * while (num-clusters > min-clusters && smallest-merge-cost <= threshold) + * if (size-of-new-cluster <= max-cluster-size) + * merge the two clusters with lowest cost * \endcode * * The cost between two clusters is the average cost of all pairwise @@ -126,11 +170,19 @@ class AgglomerativeClusterer { * costs between clusters I and M and clusters I and N, where * cluster J was formed by merging clusters M and N. * + * If the number of points to cluster is larger than first-pass-max-points, + * then clustering is done in two passes. In the first pass, input points are + * divided into contiguous subsets of size at most first-pass-max-points and + * each subset is clustered separately. In the second pass, the first pass + * clusters are merged into the final set of clusters. + * */ void AgglomerativeCluster( const Matrix &costs, - BaseFloat thresh, - int32 min_clust, + BaseFloat threshold, + int32 min_clusters, + int32 first_pass_max_points, + BaseFloat max_cluster_fraction, std::vector *assignments_out); } // end namespace kaldi. diff --git a/src/ivectorbin/agglomerative-cluster.cc b/src/ivectorbin/agglomerative-cluster.cc index 9dca9bfeb83..4812dd291e1 100644 --- a/src/ivectorbin/agglomerative-cluster.cc +++ b/src/ivectorbin/agglomerative-cluster.cc @@ -2,6 +2,7 @@ // Copyright 2016-2018 David Snyder // 2017-2018 Matthew Maciejewski +// 2019 Dogan Can // See ../../COPYING for clarification regarding multiple authors // @@ -47,8 +48,9 @@ int main(int argc, char *argv[]) { ParseOptions po(usage); std::string reco2num_spk_rspecifier; - BaseFloat threshold = 0.0; + BaseFloat threshold = 0.0, max_spk_fraction = 1.0; bool read_costs = false; + int32 first_pass_max_utterances = std::numeric_limits::max(); po.Register("reco2num-spk-rspecifier", &reco2num_spk_rspecifier, "If supplied, clustering creates exactly this many clusters for each" @@ -58,6 +60,16 @@ int main(int argc, char *argv[]) { po.Register("read-costs", &read_costs, "If true, the first" " argument is interpreted as a matrix of costs rather than a" " similarity matrix."); + po.Register("first-pass-max-utterances", &first_pass_max_utterances, + "If the number of utterances is larger than first-pass-max-utterances," + " then clustering is done in two passes. In the first pass, input points" + " are divided into contiguous subsets of size first-pass-max-utterances" + " and each subset is clustered separately. In the second pass, the first" + " pass clusters are merged into the final set of clusters."); + po.Register("max-spk-fraction", &max_spk_fraction, "Merge clusters if the" + " total fraction of utterances in them is less than this threshold." + " This is active only when reco2num-spk-rspecifier is supplied and" + " 1.0 / num-spk <= max-spk-fraction <= 1.0."); po.Read(argc, argv); @@ -90,10 +102,17 @@ int main(int argc, char *argv[]) { std::vector spk_ids; if (reco2num_spk_rspecifier.size()) { int32 num_speakers = reco2num_spk_reader.Value(reco); - AgglomerativeCluster(costs, - std::numeric_limits::max(), num_speakers, &spk_ids); + if (1.0 / num_speakers <= max_spk_fraction && max_spk_fraction <= 1.0) + AgglomerativeCluster(costs, std::numeric_limits::max(), + num_speakers, first_pass_max_utterances, + max_spk_fraction, &spk_ids); + else + AgglomerativeCluster(costs, std::numeric_limits::max(), + num_speakers, first_pass_max_utterances, + 1.0, &spk_ids); } else { - AgglomerativeCluster(costs, threshold, 1, &spk_ids); + AgglomerativeCluster(costs, threshold, 1, first_pass_max_utterances, + 1.0, &spk_ids); } for (int32 i = 0; i < spk_ids.size(); i++) label_writer.Write(uttlist[i], spk_ids[i]);