diff --git a/src/clustering/Clustering.cpp b/src/clustering/Clustering.cpp index 91fda8934..0025e19ab 100644 --- a/src/clustering/Clustering.cpp +++ b/src/clustering/Clustering.cpp @@ -4,10 +4,12 @@ #include "Util.h" #include "itoa.h" #include "Timer.h" +#include "SequenceWeights.h" Clustering::Clustering(const std::string &seqDB, const std::string &seqDBIndex, const std::string &alnDB, const std::string &alnDBIndex, const std::string &outDB, const std::string &outDBIndex, + const std::string &sequenceWeightFile, unsigned int maxIteration, int similarityScoreType, int threads, int compressed) : maxIteration(maxIteration), similarityScoreType(similarityScoreType), threads(threads), @@ -16,7 +18,21 @@ Clustering::Clustering(const std::string &seqDB, const std::string &seqDBIndex, outDBIndex(outDBIndex) { seqDbr = new DBReader(seqDB.c_str(), seqDBIndex.c_str(), threads, DBReader::USE_INDEX); - seqDbr->open(DBReader::SORT_BY_LENGTH); + + if (!sequenceWeightFile.empty()) { + + seqDbr->open(DBReader::SORT_BY_ID); + + SequenceWeights *sequenceWeights = new SequenceWeights(sequenceWeightFile.c_str()); + float localid2weight[seqDbr->getSize()]; + for (size_t id = 0; id < seqDbr->getSize(); id++) { + size_t key = seqDbr->getDbKey(id); + localid2weight[id] = sequenceWeights->getWeightById(key); + } + seqDbr->sortIndex(localid2weight); + + } else + seqDbr->open(DBReader::SORT_BY_LENGTH); alnDbr = new DBReader(alnDB.c_str(), alnDBIndex.c_str(), threads, DBReader::USE_DATA|DBReader::USE_INDEX); alnDbr->open(DBReader::NOSORT); diff --git a/src/clustering/Clustering.h b/src/clustering/Clustering.h index d48c21aa1..2def9f9c2 100644 --- a/src/clustering/Clustering.h +++ b/src/clustering/Clustering.h @@ -11,6 +11,7 @@ class Clustering { Clustering(const std::string &seqDB, const std::string &seqDBIndex, const std::string &alnResultsDB, const std::string &alnResultsDBIndex, const std::string &outDB, const std::string &outDBIndex, + const std::string &weightFileName, unsigned int maxIteration, int similarityScoreType, int threads, int compressed); void run(int mode); diff --git a/src/clustering/ClusteringAlgorithms.cpp b/src/clustering/ClusteringAlgorithms.cpp index a5ecc433b..e521ea87d 100644 --- a/src/clustering/ClusteringAlgorithms.cpp +++ b/src/clustering/ClusteringAlgorithms.cpp @@ -282,7 +282,7 @@ void ClusteringAlgorithms::greedyIncrementalLowMem( unsigned int *assignedcluste #pragma omp for schedule(dynamic, 1000) for(size_t i = 0; i < dbSize; i++) { unsigned int clusterKey = seqDbr->getDbKey(i); - unsigned int clusterId = seqDbr->getId(clusterKey); + unsigned int clusterId = i; // try to set your self as cluster centriod // if some other cluster covered diff --git a/src/clustering/Main.cpp b/src/clustering/Main.cpp index b7c0ee940..35a18b0ac 100644 --- a/src/clustering/Main.cpp +++ b/src/clustering/Main.cpp @@ -6,7 +6,7 @@ int clust(int argc, const char **argv, const Command& command) { par.parseParameters(argc, argv, command, true, 0, 0); Clustering clu(par.db1, par.db1Index, par.db2, par.db2Index, - par.db3, par.db3Index, par.maxIteration, + par.db3, par.db3Index, par.weightFile, par.maxIteration, par.similarityScoreType, par.threads, par.compressed); clu.run(par.clusteringMode); return EXIT_SUCCESS; diff --git a/src/commons/CMakeLists.txt b/src/commons/CMakeLists.txt index 463728cae..dc7ab3e74 100644 --- a/src/commons/CMakeLists.txt +++ b/src/commons/CMakeLists.txt @@ -34,6 +34,7 @@ set(commons_header_files commons/PatternCompiler.h commons/ScoreMatrix.h commons/Sequence.h + commons/SequenceWeights.h commons/StringBlock.h commons/SubstitutionMatrix.h commons/SubstitutionMatrixProfileStates.h @@ -69,6 +70,7 @@ set(commons_source_files commons/ProfileStates.cpp commons/LibraryReader.cpp commons/Sequence.cpp + commons/SequenceWeights.cpp commons/SubstitutionMatrix.cpp commons/tantan.cpp commons/UniprotKB.cpp diff --git a/src/commons/DBReader.cpp b/src/commons/DBReader.cpp index 9357ceab0..f52d2369c 100644 --- a/src/commons/DBReader.cpp +++ b/src/commons/DBReader.cpp @@ -213,6 +213,9 @@ template bool DBReader::open(int accessType){ template void DBReader::sortIndex(bool) { } +template +void DBReader::sortIndex(float*) { +} template bool DBReader::isSortedByOffset(){ @@ -234,8 +237,31 @@ void DBReader::sortIndex(bool isSortedById) { } } +template<> +void DBReader::sortIndex(float *weights) { + + this->accessType=DBReader::SORT_BY_WEIGHTS; + std::pair *sortForMapping = new std::pair[size]; + id2local = new unsigned int[size]; + local2id = new unsigned int[size]; + incrementMemory(sizeof(unsigned int) * 2 * size); + for (size_t i = 0; i < size; i++) { + id2local[i] = i; + local2id[i] = i; + sortForMapping[i] = std::make_pair(i, weights[i]); + } + //this sort has to be stable to assure same clustering results + SORT_PARALLEL(sortForMapping, sortForMapping + size, comparePairByWeight()); + for (size_t i = 0; i < size; i++) { + id2local[sortForMapping[i].first] = i; + local2id[i] = sortForMapping[i].first; + } + delete[] sortForMapping; +} + template<> void DBReader::sortIndex(bool isSortedById) { + // First, we sort the index by IDs and we keep track of the original // ordering in mappingToOriginalIndex array size_t* mappingToOriginalIndex=NULL; @@ -294,7 +320,6 @@ void DBReader::sortIndex(bool isSortedById) { mappingToOriginalIndex[i] = i; } } - if (accessType == SORT_BY_LENGTH) { // sort the entries by the length of the sequences std::pair *sortForMapping = new std::pair[size]; diff --git a/src/commons/DBReader.h b/src/commons/DBReader.h index 89bd93d69..57589b1f6 100644 --- a/src/commons/DBReader.h +++ b/src/commons/DBReader.h @@ -257,6 +257,7 @@ class DBReader : public MemoryTracker { static const int HARDNOSORT = 6; // do not even sort by ids. static const int SORT_BY_ID_OFFSET = 7; static const int SORT_BY_OFFSET = 8; // only offset sorting saves memory and does not support random access + static const int SORT_BY_WEIGHTS= 9; static const unsigned int USE_INDEX = 0; @@ -317,6 +318,7 @@ class DBReader : public MemoryTracker { void mlock(); void sortIndex(bool isSortedById); + void sortIndex(float *weights); bool isSortedByOffset(); void unmapData(); @@ -392,6 +394,20 @@ class DBReader : public MemoryTracker { } }; + struct comparePairByWeight { + bool operator() (const std::pair& lhs, const std::pair& rhs) const{ + if(lhs.second > rhs.second) + return true; + if(rhs.second > lhs.second) + return false; + if(lhs.first < rhs.first ) + return true; + if(rhs.first < lhs.first ) + return false; + return false; + } + }; + struct comparePairByIdAndOffset { bool operator() (const std::pair& lhs, const std::pair& rhs) const{ if(lhs.second.id < rhs.second.id) diff --git a/src/commons/Parameters.cpp b/src/commons/Parameters.cpp index 45baee13f..9256116d8 100644 --- a/src/commons/Parameters.cpp +++ b/src/commons/Parameters.cpp @@ -151,7 +151,8 @@ Parameters::Parameters(): PARAM_PICK_N_SIMILAR(PARAM_PICK_N_SIMILAR_ID, "--pick-n-sim-kmer", "Add N similar to search", "Add N similar k-mers to search", typeid(int), (void *) &pickNbest, "^[1-9]{1}[0-9]*$", MMseqsParameter::COMMAND_CLUSTLINEAR | MMseqsParameter::COMMAND_EXPERT), PARAM_ADJUST_KMER_LEN(PARAM_ADJUST_KMER_LEN_ID, "--adjust-kmer-len", "Adjust k-mer length", "Adjust k-mer length based on specificity (only for nucleotides)", typeid(bool), (void *) &adjustKmerLength, "", MMseqsParameter::COMMAND_CLUSTLINEAR | MMseqsParameter::COMMAND_EXPERT), PARAM_RESULT_DIRECTION(PARAM_RESULT_DIRECTION_ID, "--result-direction", "Result direction", "result is 0: query, 1: target centric", typeid(int), (void *) &resultDirection, "^[0-1]{1}$", MMseqsParameter::COMMAND_CLUSTLINEAR | MMseqsParameter::COMMAND_EXPERT), - + PARAM_WEIGHT_FILE(PARAM_WEIGHT_FILE_ID, "--weights", "Weight file name", "Weights used for cluster priorization", typeid(std::string), (void*) &weightFile, "", MMseqsParameter::COMMAND_CLUSTLINEAR | MMseqsParameter::COMMAND_EXPERT ), + PARAM_WEIGHT_THR(PARAM_WEIGHT_THR_ID, "--weightThr", "Weight threshold", "Weight threshold used for cluster priorization", typeid(float), (void*) &weightThr, "^[0-9]*(\\.[0-9]+)?$", MMseqsParameter::COMMAND_CLUSTLINEAR | MMseqsParameter::COMMAND_EXPERT ), // workflow PARAM_RUNNER(PARAM_RUNNER_ID, "--mpi-runner", "MPI runner", "Use MPI on compute cluster with this MPI command (e.g. \"mpirun -np 42\")", typeid(std::string), (void *) &runner, "", MMseqsParameter::COMMAND_COMMON | MMseqsParameter::COMMAND_EXPERT), PARAM_REUSELATEST(PARAM_REUSELATEST_ID, "--force-reuse", "Force restart with latest tmp", "Reuse tmp filse in tmp/latest folder ignoring parameters and version changes", typeid(bool), (void *) &reuseLatest, "", MMseqsParameter::COMMAND_COMMON | MMseqsParameter::COMMAND_EXPERT), @@ -435,6 +436,8 @@ Parameters::Parameters(): clust.push_back(&PARAM_THREADS); clust.push_back(&PARAM_COMPRESSED); clust.push_back(&PARAM_V); + clust.push_back(&PARAM_WEIGHT_FILE); + clust.push_back(&PARAM_WEIGHT_THR); // rescorediagonal rescorediagonal.push_back(&PARAM_SUB_MAT); @@ -914,6 +917,8 @@ Parameters::Parameters(): kmermatcher.push_back(&PARAM_THREADS); kmermatcher.push_back(&PARAM_COMPRESSED); kmermatcher.push_back(&PARAM_V); + kmermatcher.push_back(&PARAM_WEIGHT_FILE); + kmermatcher.push_back(&PARAM_WEIGHT_THR); // kmermatcher kmersearch.push_back(&PARAM_SEED_SUB_MAT); @@ -2431,6 +2436,9 @@ void Parameters::setDefaults() { pickNbest = 1; adjustKmerLength = false; resultDirection = Parameters::PARAM_RESULT_DIRECTION_TARGET; + weightThr = 0.9; + weightFile = ""; + // result2stats stat = ""; diff --git a/src/commons/Parameters.h b/src/commons/Parameters.h index ba1e74ae8..97eaa86aa 100644 --- a/src/commons/Parameters.h +++ b/src/commons/Parameters.h @@ -400,6 +400,7 @@ class Parameters { std::string spacedKmerPattern; // User-specified kmer pattern std::string localTmp; // Local temporary path + // ALIGNMENT int alignmentMode; // alignment mode 0=fastest on parameters, // 1=score only, 2=score, cov, start/end pos, 3=score, cov, start/end pos, seq.id, @@ -526,6 +527,8 @@ class Parameters { int pickNbest; int adjustKmerLength; int resultDirection; + float weightThr; + std::string weightFile; // indexdb int checkCompatible; @@ -828,6 +831,9 @@ class Parameters { PARAMETER(PARAM_PICK_N_SIMILAR) PARAMETER(PARAM_ADJUST_KMER_LEN) PARAMETER(PARAM_RESULT_DIRECTION) + PARAMETER(PARAM_WEIGHT_FILE) + PARAMETER(PARAM_WEIGHT_THR) + // workflow PARAMETER(PARAM_RUNNER) PARAMETER(PARAM_REUSELATEST) diff --git a/src/commons/SequenceWeights.cpp b/src/commons/SequenceWeights.cpp new file mode 100644 index 000000000..e9688e82c --- /dev/null +++ b/src/commons/SequenceWeights.cpp @@ -0,0 +1,55 @@ +// +// Created by annika on 09.11.22. +// +#include "SequenceWeights.h" +#include "Util.h" +#include "Debug.h" + +#include +#include + +SequenceWeights::SequenceWeights(const char* dataFileName) { + + //parse file and fill weightIndex + std::ifstream tsv(dataFileName); + if (tsv.fail()) { + Debug(Debug::ERROR) << "File " << dataFileName << " not found!\n"; + EXIT(EXIT_FAILURE); + } + + char keyData[255]; + std::string line; + this->indexSize = 0; + unsigned int pos = 0; + while(std::getline(tsv, line)) { + this->indexSize++; + } + + this->weightIndex = new WeightIndexEntry[this->indexSize]; + + tsv.clear(); + tsv.seekg(0); + + while(std::getline(tsv, line)) { + char *current = (char *) line.c_str(); + Util::parseKey(current, keyData); + const std::string key(keyData); + unsigned int keyId = strtoull(key.c_str(), NULL, 10); + + char *restStart = current + key.length(); + restStart = restStart + Util::skipWhitespace(restStart); + float weight = static_cast(strtod(restStart, NULL)); + this->weightIndex[pos].id = keyId; + this->weightIndex[pos].weight = weight; + pos++; + } +} + +float SequenceWeights::getWeightById(unsigned int id) { + + WeightIndexEntry val; + val.id = id; + size_t pos = std::upper_bound(weightIndex, weightIndex + indexSize, val, WeightIndexEntry::compareByIdOnly) - weightIndex; + return (pos < indexSize && weightIndex[pos].id == id ) ? weightIndex[pos].weight : std::numeric_limits::min(); +} + diff --git a/src/commons/SequenceWeights.h b/src/commons/SequenceWeights.h new file mode 100644 index 000000000..ab04c0f69 --- /dev/null +++ b/src/commons/SequenceWeights.h @@ -0,0 +1,30 @@ +// +// Created by annika on 09.11.22. +// + +#ifndef MMSEQS_SEQUENCEWEIGHTS_H +#define MMSEQS_SEQUENCEWEIGHTS_H + +class SequenceWeights{ +public: + struct WeightIndexEntry { + unsigned int id; + float weight; + + static bool compareByIdOnly(const WeightIndexEntry &x, const WeightIndexEntry &y) { + return x.id <= y.id; + } + }; + + WeightIndexEntry *weightIndex; + unsigned int indexSize; + + SequenceWeights(const char* dataFileName); + + ~SequenceWeights(); + + float getWeightById(unsigned int id); +}; + + +#endif //MMSEQS_SEQUENCEWEIGHTS_H diff --git a/src/linclust/kmermatcher.cpp b/src/linclust/kmermatcher.cpp index 035621ab6..804a8fe1c 100644 --- a/src/linclust/kmermatcher.cpp +++ b/src/linclust/kmermatcher.cpp @@ -15,6 +15,7 @@ #include "MarkovKmerScore.h" #include "FileUtil.h" #include "FastSort.h" +#include "SequenceWeights.h" #include #include @@ -384,6 +385,59 @@ std::pair fillKmerPositionArray(KmerPosition * kmerArray, siz return std::make_pair(offset, longestKmer); } + +template +void swapCenterSequence(KmerPosition *hashSeqPair, size_t splitKmerCount, SequenceWeights &seqWeights) { + + + size_t prevHash = hashSeqPair[0].kmer; + if(TYPE == Parameters::DBTYPE_NUCLEOTIDES){ + prevHash = BIT_SET(prevHash, 63); + } + + size_t repSeqPos = 0; + size_t prevHashStart = 0; + float repSeqWeight = seqWeights.getWeightById(hashSeqPair[repSeqPos].id); + for (size_t elementIdx = 0; elementIdx < splitKmerCount; elementIdx++) { + + size_t currKmer = hashSeqPair[elementIdx].kmer; + if(TYPE == Parameters::DBTYPE_NUCLEOTIDES){ + currKmer = BIT_SET(currKmer, 63); + } + if (prevHash != currKmer) { + + // swap sequence with heighest weigtht to the front of the block + if (repSeqPos != prevHashStart) + std::swap(hashSeqPair[repSeqPos],hashSeqPair[prevHashStart]); + + prevHashStart = elementIdx; + prevHash = hashSeqPair[elementIdx].kmer; + if(TYPE == Parameters::DBTYPE_NUCLEOTIDES){ + prevHash = BIT_SET(prevHash, 63); + } + repSeqPos = elementIdx; + repSeqWeight = seqWeights.getWeightById(hashSeqPair[repSeqPos].id); + } + else { + float currWeight = seqWeights.getWeightById(hashSeqPair[elementIdx].id); + if (currWeight > repSeqWeight) { + repSeqWeight = currWeight; + repSeqPos = elementIdx; + } + } + + if (hashSeqPair[elementIdx].kmer == SIZE_T_MAX) { + break; + } + + } +} + +template void swapCenterSequence<0, short>(KmerPosition *kmers, size_t splitKmerCount, SequenceWeights &seqWeights); +template void swapCenterSequence<0, int>(KmerPosition *kmers, size_t splitKmerCount, SequenceWeights &seqWeights); +template void swapCenterSequence<1, short>(KmerPosition *kmers, size_t splitKmerCount, SequenceWeights &seqWeights); +template void swapCenterSequence<1, int>(KmerPosition *kmers, size_t splitKmerCount, SequenceWeights &seqWeights); + template KmerPosition * doComputation(size_t totalKmers, size_t hashStartRange, size_t hashEndRange, std::string splitFile, DBReader & seqDbr, Parameters & par, BaseMatrix * subMat) { @@ -412,13 +466,26 @@ KmerPosition * doComputation(size_t totalKmers, size_t hashStartRange, size_t } Debug(Debug::INFO) << timer.lap() << "\n"; + SequenceWeights *sequenceWeights = NULL; + // use priority information to swap center sequences + if (par.PARAM_WEIGHT_FILE.wasSet) { + sequenceWeights = new SequenceWeights(par.weightFile.c_str()); + if (sequenceWeights != NULL) { + if (Parameters::isEqualDbtype(seqDbr.getDbtype(), Parameters::DBTYPE_NUCLEOTIDES)) { + swapCenterSequence(hashSeqPair, totalKmers, *sequenceWeights); + } else { + swapCenterSequence(hashSeqPair, totalKmers, *sequenceWeights); + } + } + } + // assign rep. sequence to same kmer members // The longest sequence is the first since we sorted by kmer, seq.Len and id size_t writePos; if(Parameters::isEqualDbtype(seqDbr.getDbtype(), Parameters::DBTYPE_NUCLEOTIDES)){ - writePos = assignGroup(hashSeqPair, totalKmers, par.includeOnlyExtendable, par.covMode, par.covThr); + writePos = assignGroup(hashSeqPair, totalKmers, par.includeOnlyExtendable, par.covMode, par.covThr, sequenceWeights, par.weightThr); }else{ - writePos = assignGroup(hashSeqPair, totalKmers, par.includeOnlyExtendable, par.covMode, par.covThr); + writePos = assignGroup(hashSeqPair, totalKmers, par.includeOnlyExtendable, par.covMode, par.covThr, sequenceWeights, par.weightThr); } // sort by rep. sequence (stored in kmer) and sequence id @@ -447,8 +514,11 @@ KmerPosition * doComputation(size_t totalKmers, size_t hashStartRange, size_t return hashSeqPair; } + + template -size_t assignGroup(KmerPosition *hashSeqPair, size_t splitKmerCount, bool includeOnlyExtendable, int covMode, float covThr) { +size_t assignGroup(KmerPosition *hashSeqPair, size_t splitKmerCount, bool includeOnlyExtendable, int covMode, float covThr, + SequenceWeights * sequenceWeights, float weightThr) { size_t writePos=0; size_t prevHash = hashSeqPair[0].kmer; size_t repSeqId = hashSeqPair[0].id; @@ -459,6 +529,7 @@ size_t assignGroup(KmerPosition *hashSeqPair, size_t splitKmerCount, bool inc } size_t prevHashStart = 0; size_t prevSetSize = 0; + size_t skipByWeightCount = 0; T queryLen=hashSeqPair[0].seqLen; bool repIsReverse = false; T repSeq_i_pos = hashSeqPair[0].pos; @@ -469,11 +540,15 @@ size_t assignGroup(KmerPosition *hashSeqPair, size_t splitKmerCount, bool inc } if (prevHash != currKmer) { for (size_t i = prevHashStart; i < elementIdx; i++) { + // skip target sequences if weight > weightThr + if(i > prevHashStart && sequenceWeights != NULL + && sequenceWeights->getWeightById(hashSeqPair[i].id) > weightThr) + continue; size_t kmer = hashSeqPair[i].kmer; if(TYPE == Parameters::DBTYPE_NUCLEOTIDES) { kmer = BIT_SET(hashSeqPair[i].kmer, 63); } - size_t rId = (kmer != SIZE_T_MAX) ? ((prevSetSize == 1) ? SIZE_T_MAX : repSeqId) : SIZE_T_MAX; + size_t rId = (kmer != SIZE_T_MAX) ? ((prevSetSize-skipByWeightCount == 1) ? SIZE_T_MAX : repSeqId) : SIZE_T_MAX; // remove singletones from set if(rId != SIZE_T_MAX){ int diagonal = repSeq_i_pos - hashSeqPair[i].pos; @@ -536,6 +611,7 @@ size_t assignGroup(KmerPosition *hashSeqPair, size_t splitKmerCount, bool inc hashSeqPair[i].kmer = (i != writePos - 1) ? SIZE_T_MAX : hashSeqPair[i].kmer; } prevSetSize = 0; + skipByWeightCount = 0; prevHashStart = elementIdx; repSeqId = hashSeqPair[elementIdx].id; if(TYPE == Parameters::DBTYPE_NUCLEOTIDES){ @@ -549,6 +625,9 @@ size_t assignGroup(KmerPosition *hashSeqPair, size_t splitKmerCount, bool inc break; } prevSetSize++; + if(prevSetSize > 1 && sequenceWeights != NULL + && sequenceWeights->getWeightById(hashSeqPair[elementIdx].id) > weightThr) + skipByWeightCount++; prevHash = hashSeqPair[elementIdx].kmer; if(TYPE == Parameters::DBTYPE_NUCLEOTIDES){ prevHash = BIT_SET(prevHash, 63); @@ -558,10 +637,11 @@ size_t assignGroup(KmerPosition *hashSeqPair, size_t splitKmerCount, bool inc return writePos; } -template size_t assignGroup<0, short>(KmerPosition *kmers, size_t splitKmerCount, bool includeOnlyExtendable, int covMode, float covThr); -template size_t assignGroup<0, int>(KmerPosition *kmers, size_t splitKmerCount, bool includeOnlyExtendable, int covMode, float covThr); -template size_t assignGroup<1, short>(KmerPosition *kmers, size_t splitKmerCount, bool includeOnlyExtendable, int covMode, float covThr); -template size_t assignGroup<1, int>(KmerPosition *kmers, size_t splitKmerCount, bool includeOnlyExtendable, int covMode, float covThr); +template size_t assignGroup<0, short>(KmerPosition *kmers, size_t splitKmerCount, bool includeOnlyExtendable, int covMode, float covThr, SequenceWeights *sequenceWeights, float weightThr); +template size_t assignGroup<0, int>(KmerPosition *kmers, size_t splitKmerCount, bool includeOnlyExtendable, int covMode, float covThr, SequenceWeights *sequenceWeights, float weightThr); +template size_t assignGroup<1, short>(KmerPosition *kmers, size_t splitKmerCount, bool includeOnlyExtendable, int covMode, float covThr, SequenceWeights *sequenceWeights, float weightThr); +template size_t assignGroup<1, int>(KmerPosition *kmers, size_t splitKmerCount, bool includeOnlyExtendable, int covMode, float covThr, SequenceWeights *sequenceWeights, float weightThr); + void setLinearFilterDefault(Parameters *p) { p->covThr = 0.8;