Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions egs/callhome_diarization/v1/diarization/cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ stage=0
nj=10
cleanup=true
threshold=0.5
max_spk_fraction=1.0
first_pass_max_utterances=32767
rttm_channel=0
read_costs=false
reco2num_spk=
Expand All @@ -36,6 +38,15 @@ if [ $# != 2 ]; then
echo " --threshold <threshold|0> # Cluster stopping criterion. Clusters with scores greater"
echo " # than this value will be merged until all clusters"
echo " # exceed this value."
echo " --max-spk-fraction <max-spk-fraction|1.0> # Clusters with total fraction of utterances greater than"
echo " # this value will not be merged. This is active only when"
echo " # reco2num-spk is supplied and"
echo " # 1.0 / num-spk <= max-spk-fraction <= 1.0."
echo " --first-pass-max-utterances <max-utts|32767> # If the number of utterances is larger than first-pass-max-utterances,"
echo " # then clustering is done in two passes. In the first pass, input points"
echo " # are divided into contiguous subsets of size first-pass-max-utterances"
echo " # and each subset is clustered separately. In the second pass, the first"
echo " # pass clusters are merged into the final set of clusters."
echo " --rttm-channel <rttm-channel|0> # The value passed into the RTTM channel field. Only affects"
echo " # the format of the RTTM file."
echo " --read-costs <read-costs|false> # If true, interpret input scores as costs, i.e. similarity"
Expand Down Expand Up @@ -78,8 +89,10 @@ if [ $stage -le 0 ]; then
echo "$0: clustering scores"
$cmd JOB=1:$nj $dir/log/agglomerative_cluster.JOB.log \
agglomerative-cluster --threshold=$threshold --read-costs=$read_costs \
--reco2num-spk-rspecifier=$reco2num_spk scp:"$feats" \
ark,t:$sdata/JOB/spk2utt ark,t:$dir/labels.JOB || exit 1;
--reco2num-spk-rspecifier=$reco2num_spk \
--max-spk-fraction=$max_spk_fraction \
--first-pass-max-utterances=$first_pass_max_utterances \
scp:"$feats" ark,t:$sdata/JOB/spk2utt ark,t:$dir/labels.JOB || exit 1;
fi

if [ $stage -le 1 ]; then
Expand Down
220 changes: 163 additions & 57 deletions src/ivector/agglomerative-clustering.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

// Copyright 2017-2018 Matthew Maciejewski
// 2018 David Snyder
// 2019 Dogan Can

// See ../../COPYING for clarification regarding multiple authors
//
Expand All @@ -24,65 +25,98 @@
namespace kaldi {

void AgglomerativeClusterer::Cluster() {
KALDI_VLOG(2) << "Initializing cluster assignments.";
Initialize();

KALDI_VLOG(2) << "Clustering...";
// This is the main algorithm loop. It moves through the queue merging
// clusters until a stopping criterion has been reached.
while (num_clusters_ > min_clust_ && !queue_.empty()) {
std::pair<BaseFloat, std::pair<uint16, uint16> > pr = queue_.top();
int32 i = (int32) pr.second.first, j = (int32) pr.second.second;
queue_.pop();
// check to make sure clusters have not already been merged
if ((active_clusters_.find(i) != active_clusters_.end()) &&
(active_clusters_.find(j) != active_clusters_.end()))
MergeClusters(i, j);
}
if (num_points_ > first_pass_max_points_)
ClusterTwoPass();
else
ClusterSinglePass();
}

std::vector<int32> new_assignments(num_points_);
int32 label_id = 0;
std::set<int32>::iterator it;
// Iterate through the clusters and assign all utterances within the cluster
// an ID label unique to the cluster. This is the final output and frees up
// the cluster memory accordingly.
for (it = active_clusters_.begin(); it != active_clusters_.end(); ++it) {
++label_id;
AhcCluster *cluster = clusters_map_[*it];
std::vector<int32>::iterator utt_it;
for (utt_it = cluster->utt_ids.begin();
utt_it != cluster->utt_ids.end(); ++utt_it)
new_assignments[*utt_it] = label_id;
delete cluster;
void AgglomerativeClusterer::ClusterSinglePass() {
InitializeClusters(0, num_points_);
ComputeClusters(min_clusters_);
AssignClusters();
}

void AgglomerativeClusterer::ClusterTwoPass() {
// This is the first pass loop. We divide the input into equal size subsets
// making sure each subset has at most first_pass_max_points_ points. Then, we
// cluster the points in each subset separately until a stopping criterion is
// reached. We set the minimum number of clusters to 10 * min_clusters_ for
// each subset to avoid early merging of most clusters that would otherwise be
// kept separate in single pass clustering.
BaseFloat num_points = static_cast<BaseFloat>(num_points_);
int32 num_subsets = ceil(num_points / first_pass_max_points_);
int32 subset_size = ceil(num_points / num_subsets);
for (int32 n = 0; n < num_points_; n += subset_size) {
InitializeClusters(n, std::min(n + subset_size, num_points_));
ComputeClusters(min_clusters_ * 10);
AddClustersToSecondPass();
}
assignments_->swap(new_assignments);

// We swap the contents of the first and second pass data structures so that
// we can use the same method to do second pass clustering.
clusters_map_.swap(second_pass_clusters_map_);
active_clusters_.swap(second_pass_active_clusters_);
cluster_cost_map_.swap(second_pass_cluster_cost_map_);
queue_.swap(second_pass_queue_);
count_ = second_pass_count_;

// This is the second pass. It moves through the queue merging clusters
// determined in the first pass until a stopping criterion is reached.
ComputeClusters(min_clusters_);

AssignClusters();
}

BaseFloat AgglomerativeClusterer::GetCost(int32 i, int32 j) {
uint32 AgglomerativeClusterer::EncodePair(int32 i, int32 j) {
if (i < j)
return cluster_cost_map_[std::make_pair(i, j)];
return (static_cast<uint32>(i) << 16) + static_cast<uint32>(j);
else
return cluster_cost_map_[std::make_pair(j, i)];
return (static_cast<uint32>(j) << 16) + static_cast<uint32>(i);
}

std::pair<int32, int32> AgglomerativeClusterer::DecodePair(uint32 key) {
return std::make_pair(static_cast<int32>(key >> 16),
static_cast<int32>(key & 0x0000FFFFu));
}

void AgglomerativeClusterer::Initialize() {
KALDI_ASSERT(num_clusters_ != 0);
for (int32 i = 0; i < num_points_; i++) {
void AgglomerativeClusterer::InitializeClusters(int32 first, int32 last) {
KALDI_ASSERT(last > first);
clusters_map_.clear();
active_clusters_.clear();
cluster_cost_map_.clear();
queue_ = QueueType(); // priority_queue does not have a clear method

for (int32 i = first; i < last; i++) {
// create an initial cluster of size 1 for each point
std::vector<int32> ids;
ids.push_back(i);
AhcCluster *c = new AhcCluster(++count_, -1, -1, ids);
clusters_map_[count_] = c;
active_clusters_.insert(count_);
AhcCluster *c = new AhcCluster(i + 1, -1, -1, ids);
clusters_map_[i + 1] = c;
active_clusters_.insert(i + 1);

// propagate the queue with all pairs from the cost matrix
for (int32 j = i+1; j < num_clusters_; j++) {
BaseFloat cost = costs_(i,j);
cluster_cost_map_[std::make_pair(i+1, j+1)] = cost;
if (cost <= thresh_)
queue_.push(std::make_pair(cost,
std::make_pair(static_cast<uint16>(i+1),
static_cast<uint16>(j+1))));
for (int32 j = i + 1; j < last; j++) {
BaseFloat cost = costs_(i, j);
uint32 key = EncodePair(i + 1, j + 1);
cluster_cost_map_[key] = cost;
if (cost <= threshold_)
queue_.push(std::make_pair(cost, key));
}
}
}

void AgglomerativeClusterer::ComputeClusters(int32 min_clusters) {
while (active_clusters_.size() > min_clusters && !queue_.empty()) {
std::pair<BaseFloat, uint32> pr = queue_.top();
int32 i, j;
std::tie(i, j) = DecodePair(pr.second);
queue_.pop();
// check to make sure clusters have not already been merged
if ((active_clusters_.find(i) != active_clusters_.end()) &&
(active_clusters_.find(j) != active_clusters_.end())) {
if (clusters_map_[i]->size + clusters_map_[j]->size <= max_cluster_size_)
MergeClusters(i, j);
}
}
}
Expand All @@ -105,27 +139,99 @@ void AgglomerativeClusterer::MergeClusters(int32 i, int32 j) {
std::set<int32>::iterator it;
for (it = active_clusters_.begin(); it != active_clusters_.end(); ++it) {
// The new cost is the sum of the costs of the new cluster's parents
BaseFloat new_cost = GetCost(*it, i) + GetCost(*it, j);
cluster_cost_map_[std::make_pair(*it, count_)] = new_cost;
BaseFloat new_cost = cluster_cost_map_[EncodePair(*it, i)] +
cluster_cost_map_[EncodePair(*it, j)];
uint32 new_key = EncodePair(*it, count_);
cluster_cost_map_[new_key] = new_cost;
BaseFloat norm = clust1->size * (clusters_map_[*it])->size;
if (new_cost / norm <= thresh_)
queue_.push(std::make_pair(new_cost / norm,
std::make_pair(static_cast<uint16>(*it),
static_cast<uint16>(count_))));
if (new_cost / norm <= threshold_)
queue_.push(std::make_pair(new_cost / norm, new_key));
}
active_clusters_.insert(count_);
clusters_map_[count_] = clust1;
delete clust2;
num_clusters_--;
}

void AgglomerativeClusterer::AddClustersToSecondPass() {
// This method collects the results of first pass clustering for one subset,
// i.e. adds the set of active clusters to the set of second pass active
// clusters and computes the costs for the newly formed cluster pairs.
std::set<int32>::iterator it1, it2;
int32 count = second_pass_count_;
for (it1 = active_clusters_.begin(); it1 != active_clusters_.end(); ++it1) {
AhcCluster *clust1 = clusters_map_[*it1];
second_pass_clusters_map_[++count] = clust1;

// Compute new cluster pair costs
for (it2 = second_pass_active_clusters_.begin();
it2 != second_pass_active_clusters_.end(); ++it2) {
AhcCluster *clust2 = second_pass_clusters_map_[*it2];
uint32 new_key = EncodePair(count, *it2);

BaseFloat new_cost = 0.0;
std::vector<int32>::iterator utt_it1, utt_it2;
for (utt_it1 = clust1->utt_ids.begin();
utt_it1 != clust1->utt_ids.end(); ++utt_it1) {
for (utt_it2 = clust2->utt_ids.begin();
utt_it2 != clust2->utt_ids.end(); ++utt_it2) {
new_cost += costs_(*utt_it1, *utt_it2);
}
}

second_pass_cluster_cost_map_[new_key] = new_cost;
BaseFloat norm = clust1->size * clust2->size;
if (new_cost / norm <= threshold_)
second_pass_queue_.push(std::make_pair(new_cost / norm, new_key));
}

// Copy cluster pair costs that were already computed in the first pass
int32 count2 = second_pass_count_;
for (it2 = active_clusters_.begin(); it2 != it1; ++it2) {
uint32 key = EncodePair(*it1, *it2);
BaseFloat cost = cluster_cost_map_[key];
BaseFloat norm = clust1->size * (clusters_map_[*it2])->size;
uint32 new_key = EncodePair(count, ++count2);
second_pass_cluster_cost_map_[new_key] = cost;
if (cost / norm <= threshold_)
second_pass_queue_.push(std::make_pair(cost / norm, new_key));
}
}
// We update second_pass_count_ and second_pass_active_clusters_ here since
// above loop assumes they do not change while the loop is running.
while (second_pass_count_ < count)
second_pass_active_clusters_.insert(++second_pass_count_);
}

void AgglomerativeClusterer::AssignClusters() {
assignments_->resize(num_points_);
int32 label_id = 0;
std::set<int32>::iterator it;
// Iterate through the clusters and assign all utterances within the cluster
// an ID label unique to the cluster. This is the final output and frees up
// the cluster memory accordingly.
for (it = active_clusters_.begin(); it != active_clusters_.end(); ++it) {
++label_id;
AhcCluster *cluster = clusters_map_[*it];
std::vector<int32>::iterator utt_it;
for (utt_it = cluster->utt_ids.begin();
utt_it != cluster->utt_ids.end(); ++utt_it)
(*assignments_)[*utt_it] = label_id;
delete cluster;
}
}

void AgglomerativeCluster(
const Matrix<BaseFloat> &costs,
BaseFloat thresh,
int32 min_clust,
BaseFloat threshold,
int32 min_clusters,
int32 first_pass_max_points,
BaseFloat max_cluster_fraction,
std::vector<int32> *assignments_out) {
KALDI_ASSERT(min_clust >= 0);
AgglomerativeClusterer ac(costs, thresh, min_clust, assignments_out);
KALDI_ASSERT(min_clusters >= 0);
KALDI_ASSERT(max_cluster_fraction >= 1.0 / min_clusters);
AgglomerativeClusterer ac(costs, threshold, min_clusters,
first_pass_max_points, max_cluster_fraction,
assignments_out);
ac.Cluster();
}

Expand Down
Loading