diff --git a/src/CommandDeclarations.h b/src/CommandDeclarations.h index 0083267ba..0a346f16b 100644 --- a/src/CommandDeclarations.h +++ b/src/CommandDeclarations.h @@ -76,7 +76,7 @@ extern int mergedbs(int argc, const char **argv, const Command& command); extern int mergeresultsbyset(int argc, const char **argv, const Command &command); extern int msa2profile(int argc, const char **argv, const Command& command); extern int sequence2profile(int argc, const char **argv, const Command& command); -extern int mkrepseqdb(int argc, const char **argv, const Command& command); +extern int createclusearchdb(int argc, const char **argv, const Command& command); extern int msa2result(int argc, const char **argv, const Command& command); extern int multihitdb(int argc, const char **argv, const Command& command); extern int multihitsearch(int argc, const char **argv, const Command& command); diff --git a/src/MMseqsBase.cpp b/src/MMseqsBase.cpp index a9b4db8ff..98db5acb1 100644 --- a/src/MMseqsBase.cpp +++ b/src/MMseqsBase.cpp @@ -1245,8 +1245,8 @@ std::vector baseCommands = { "Martin Steinegger ", " ", CITATION_MMSEQS2, {{"sequenceDB", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA, &DbValidator::sequenceDb }}}, - {"mkrepseqdb", mkrepseqdb, &par.threadsandcompression, COMMAND_HIDDEN, - "Seperates a sequence DB into a representative and a non-representative DB", + {"createclusearchdb", createclusearchdb, &par.createclusearchdb, COMMAND_HIDDEN, + "Separates a sequence DB into a representative and a non-representative DB", NULL, "Martin Steinegger ", " ", diff --git a/src/commons/Parameters.cpp b/src/commons/Parameters.cpp index 82ac6ef63..30123626a 100644 --- a/src/commons/Parameters.cpp +++ b/src/commons/Parameters.cpp @@ -201,6 +201,8 @@ Parameters::Parameters(): PARAM_SEQUENCE_OVERLAP(PARAM_SEQUENCE_OVERLAP_ID, "--sequence-overlap", "Overlap between sequences", "Overlap between sequences", typeid(int), (void *) &sequenceOverlap, "^(0|[1-9]{1}[0-9]*)$"), PARAM_SEQUENCE_SPLIT_MODE(PARAM_SEQUENCE_SPLIT_MODE_ID, "--sequence-split-mode", "Sequence split mode", "Sequence split mode 0: copy data, 1: soft link data and write new index,", typeid(int), (void *) &sequenceSplitMode, "^[0-1]{1}$"), PARAM_HEADER_SPLIT_MODE(PARAM_HEADER_SPLIT_MODE_ID, "--headers-split-mode", "Header split mode", "Header split mode: 0: split position, 1: original header", typeid(int), (void *) &headerSplitMode, "^[0-1]{1}$"), + // createclusearchdb + PARAM_DB_SUFFIX_LIST(PARAM_DB_SUFFIX_LIST_ID, "--db-suffix-list", "Database suffixes", "Suffixes for database to be split in rep/seq", typeid(std::string), (void *) &dbSuffixList, ""), // gff2db PARAM_GFF_TYPE(PARAM_GFF_TYPE_ID, "--gff-type", "GFF type", "Comma separated list of feature types in the GFF file to select", typeid(std::string), (void *) &gffType, ""), // translatenucs @@ -322,6 +324,12 @@ Parameters::Parameters(): threadsandcompression.push_back(&PARAM_COMPRESSED); threadsandcompression.push_back(&PARAM_V); + // createclusearchdb + createclusearchdb.push_back(&PARAM_THREADS); + createclusearchdb.push_back(&PARAM_COMPRESSED); + createclusearchdb.push_back(&PARAM_V); + createclusearchdb.push_back(&PARAM_DB_SUFFIX_LIST); + // alignall alignall.push_back(&PARAM_SUB_MAT); alignall.push_back(&PARAM_ADD_BACKTRACE); diff --git a/src/commons/Parameters.h b/src/commons/Parameters.h index e0933df8b..2693c7b7e 100644 --- a/src/commons/Parameters.h +++ b/src/commons/Parameters.h @@ -565,6 +565,9 @@ class Parameters { // result2flat bool useHeader; + // createclusearchdb + std::string dbSuffixList; + // gff2db std::string gffType; @@ -906,6 +909,9 @@ class Parameters { PARAMETER(PARAM_SEQUENCE_SPLIT_MODE) PARAMETER(PARAM_HEADER_SPLIT_MODE) + // createclusearchdb + PARAMETER(PARAM_DB_SUFFIX_LIST) + // gff2db PARAMETER(PARAM_GFF_TYPE) @@ -1118,6 +1124,7 @@ class Parameters { std::vector summarizeresult; std::vector summarizetabs; std::vector extractdomains; + std::vector createclusearchdb; std::vector extractalignedregion; std::vector convertkb; std::vector tsv2db; diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 7d4035e8d..f8498ed0c 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -36,7 +36,7 @@ set(util_source_files util/mergeclusters.cpp util/mergeresultsbyset.cpp util/mergedbs.cpp - util/mkrepseqdb.cpp + util/createclusterdb.cpp util/msa2profile.cpp util/msa2result.cpp util/nrtotaxmapping.cpp diff --git a/src/util/createclusterdb.cpp b/src/util/createclusterdb.cpp new file mode 100644 index 000000000..380f0c24e --- /dev/null +++ b/src/util/createclusterdb.cpp @@ -0,0 +1,148 @@ +#include "DBReader.h" +#include "DBWriter.h" +#include "Debug.h" +#include "Util.h" +#include "FastSort.h" +#include "Parameters.h" + +#ifdef OPENMP +#include +#endif + +int createclusearchdb(int argc, const char **argv, const Command& command) { + Parameters &par = Parameters::getInstance(); + par.parseParameters(argc, argv, command, true, 0, MMseqsParameter::COMMAND_ALIGN); + DBReader clusterReader(par.db2.c_str(), par.db2Index.c_str(), par.threads, + DBReader::USE_DATA | DBReader::USE_INDEX); + clusterReader.open(DBReader::NOSORT); + std::vector suffixes = Util::split(par.dbSuffixList, ","); + suffixes.insert(suffixes.begin(), ""); + for(size_t prefix = 0; prefix < suffixes.size(); prefix++) { + std::string db1 = par.db1 + suffixes[prefix]; + std::string db1Index = par.db1 + suffixes[prefix] + ".index"; + DBReader reader(db1.c_str(), db1Index.c_str(), par.threads, + DBReader::USE_DATA | DBReader::USE_INDEX); + reader.open(DBReader::NOSORT); + reader.readMmapedDataInMemory(); + + std::string repDbSeq = par.db3 + suffixes[prefix]; + std::string repDbSeqIdx = par.db3 + suffixes[prefix] + ".index"; + + DBWriter dbwRep(repDbSeq.c_str(), repDbSeqIdx.c_str(), static_cast(par.threads), par.compressed, + reader.getDbtype()); + dbwRep.open(); + std::string seqsDbSeq = par.db3 + "_seq" + suffixes[prefix]; + std::string seqsDbSeqIdx = par.db3 + "_seq" + suffixes[prefix] + ".index"; + DBWriter dbwClu(seqsDbSeq.c_str(), seqsDbSeqIdx.c_str(), static_cast(par.threads), par.compressed, + reader.getDbtype()); + dbwClu.open(); + Debug::Progress progress(clusterReader.getSize()); + #pragma omp parallel + { + unsigned int thread_idx = 0; + #ifdef OPENMP + thread_idx = static_cast(omp_get_thread_num()); + #endif + std::string resultBuffer; + // write output file + #pragma omp for schedule(dynamic, 1) + for (size_t id = 0; id < clusterReader.getSize(); id++) { + progress.updateProgress(); + char *data = clusterReader.getData(id, thread_idx); + size_t repKey = clusterReader.getDbKey(id); + size_t repDataId = reader.getId(repKey); + size_t repEntryLen = reader.getEntryLen(repDataId); + dbwRep.writeData(reader.getData(repDataId, thread_idx), repEntryLen - 1, repKey, thread_idx); + while (*data != '\0') { + // parse dbkey + size_t dbKey = Util::fast_atoi(data); + if (dbKey == repKey) { + data = Util::skipLine(data); + continue; + } + size_t readerId = reader.getId(dbKey); + dbwClu.writeData(reader.getData(readerId, thread_idx), + reader.getEntryLen(readerId) - 1, dbKey, thread_idx); + data = Util::skipLine(data); + } + resultBuffer.clear(); + } + } + dbwRep.close(true); + dbwClu.close(true); + reader.close(); + + // merge index + DBReader dbrRep(repDbSeq.c_str(), repDbSeqIdx.c_str(), par.threads, + DBReader::USE_INDEX); + dbrRep.open(DBReader::NOSORT); + DBReader dbrClu(seqsDbSeq.c_str(), seqsDbSeqIdx.c_str(), par.threads, + DBReader::USE_INDEX); + dbrClu.open(DBReader::NOSORT); + std::string seqsDbSeqIdxTmp = seqsDbSeqIdx + "_tmp"; + + FILE *sIndex = FileUtil::openAndDelete(seqsDbSeqIdxTmp.c_str(), "w"); + std::vector::Index> allIndex(dbrClu.getSize() + dbrRep.getSize()); + size_t dataSize = 0; + for (size_t i = 0; i < dbrRep.getSize(); i++) { + allIndex[i] = *dbrRep.getIndex(i); + dataSize += allIndex[i].length; + } + for (size_t i = 0; i < dbrClu.getSize(); i++) { + DBReader::Index *index = dbrClu.getIndex(i); + index->offset += dataSize; + allIndex[dbrRep.getSize() + i] = *index; + } + SORT_PARALLEL(allIndex.begin(), allIndex.end(), DBReader::Index::compareById); + char buffer[1024]; + for (size_t i = 0; i < allIndex.size(); i++) { + size_t len = DBWriter::indexToBuffer(buffer, allIndex[i].id, allIndex[i].offset, allIndex[i].length); + size_t written = fwrite(buffer, sizeof(char), len, sIndex); + if (written != len) { + Debug(Debug::ERROR) << "Cannot write index file " << seqsDbSeqIdxTmp << "\n"; + EXIT(EXIT_FAILURE); + } + } + if (fclose(sIndex) != 0) { + Debug(Debug::ERROR) << "Cannot close index file " << seqsDbSeqIdxTmp << "\n"; + EXIT(EXIT_FAILURE); + } + FileUtil::move(seqsDbSeqIdxTmp.c_str(), seqsDbSeqIdx.c_str()); + FileUtil::symlinkAlias(repDbSeq, seqsDbSeq + ".0"); + FileUtil::move(seqsDbSeq.c_str(), (seqsDbSeq + ".1").c_str()); + } + clusterReader.close(); + DBReader::copyDb(par.db2, par.db3 + "_clu"); + + struct DBSuffix { + DBFiles::Files flag; + const char *suffix; + }; + + const DBSuffix suffices[] = { + {DBFiles::HEADER, "_h"}, + {DBFiles::HEADER_INDEX, "_h.index"}, + {DBFiles::HEADER_DBTYPE, "_h.dbtype"}, + {DBFiles::LOOKUP, ".lookup"}, + {DBFiles::SOURCE, ".source"}, + {DBFiles::TAX_MAPPING, "_mapping"}, + {DBFiles::TAX_NAMES, "_names.dmp"}, + {DBFiles::TAX_NODES, "_nodes.dmp"}, + {DBFiles::TAX_MERGED, "_merged.dmp"}, + {DBFiles::TAX_MERGED, "_taxonomy"}, + }; + + for (size_t i = 0; i < ARRAY_SIZE(suffices); ++i) { + std::string file = par.db1 + suffices[i].suffix; + if (suffices[i].flag && FileUtil::fileExists(file.c_str())) { + DBReader::copyDb(file, par.db3 + suffices[i].suffix); + } + } + for (size_t i = 0; i < ARRAY_SIZE(suffices); ++i) { + std::string file = par.db3 + suffices[i].suffix; + if (suffices[i].flag && FileUtil::fileExists(file.c_str())) { + DBReader::aliasDb(file, par.db3 + "_seq" + suffices[i].suffix); + } + } + return EXIT_SUCCESS; +} diff --git a/src/util/mkrepseqdb.cpp b/src/util/mkrepseqdb.cpp deleted file mode 100644 index 579dc923c..000000000 --- a/src/util/mkrepseqdb.cpp +++ /dev/null @@ -1,139 +0,0 @@ -#include "DBReader.h" -#include "DBWriter.h" -#include "Debug.h" -#include "Util.h" -#include "FastSort.h" -#include "Parameters.h" - -#ifdef OPENMP -#include -#endif - -int mkrepseqdb(int argc, const char **argv, const Command& command) { - Parameters &par = Parameters::getInstance(); - par.parseParameters(argc, argv, command, true, 0, MMseqsParameter::COMMAND_ALIGN); - - DBReader reader(par.db1.c_str(), par.db1Index.c_str(), par.threads, DBReader::USE_DATA|DBReader::USE_INDEX); - reader.open(DBReader::NOSORT); - reader.readMmapedDataInMemory(); - - DBReader clusterReader(par.db2.c_str(), par.db2Index.c_str(), par.threads, DBReader::USE_DATA|DBReader::USE_INDEX); - clusterReader.open(DBReader::NOSORT); - - DBWriter dbwRep(par.db3.c_str(), par.db3Index.c_str(), static_cast(par.threads), par.compressed, reader.getDbtype()); - dbwRep.open(); - std::string seqsDbSeq = par.db3 + "_seq"; - std::string seqsDbSeqIdx = par.db3 + "_seq.index"; - DBWriter dbwClu(seqsDbSeq.c_str(), seqsDbSeqIdx.c_str(), static_cast(par.threads), par.compressed, reader.getDbtype()); - dbwClu.open(); - Debug::Progress progress(clusterReader.getSize()); -#pragma omp parallel - { - unsigned int thread_idx = 0; -#ifdef OPENMP - thread_idx = static_cast(omp_get_thread_num()); -#endif - std::string resultBuffer; - // write output file -#pragma omp for schedule(dynamic, 1) - for (size_t id = 0; id < clusterReader.getSize(); id++) { - progress.updateProgress(); - char *data = clusterReader.getData(id, thread_idx); - size_t repKey = clusterReader.getDbKey(id); - size_t repDataId = reader.getId(repKey); - size_t repEntryLen = reader.getEntryLen(repDataId); - dbwRep.writeData(reader.getData(repDataId, thread_idx), repEntryLen - 1, repKey, thread_idx); - while(*data != '\0') { - // parse dbkey - size_t dbKey = Util::fast_atoi(data); - if(dbKey == repKey){ - data = Util::skipLine(data); - continue; - } - size_t readerId = reader.getId(dbKey); - dbwClu.writeData(reader.getData(readerId, thread_idx), - reader.getEntryLen(readerId) - 1, dbKey, thread_idx); - data = Util::skipLine(data); - } - resultBuffer.clear(); - } - } - dbwRep.close(true); - dbwClu.close(true); - clusterReader.close(); - reader.close(); - - // merge index - DBReader dbrRep(par.db3.c_str(), par.db3Index.c_str(), par.threads, DBReader::USE_INDEX); - dbrRep.open(DBReader::NOSORT); - DBReader dbrClu(seqsDbSeq.c_str(), seqsDbSeqIdx.c_str(), par.threads, DBReader::USE_INDEX); - dbrClu.open(DBReader::NOSORT); - std::string seqsDbSeqIdxTmp = seqsDbSeqIdx + "_tmp"; - - FILE *sIndex = FileUtil::openAndDelete(seqsDbSeqIdxTmp.c_str(), "w"); - std::vector::Index> allIndex(dbrClu.getSize() + dbrRep.getSize()); - size_t dataSize = 0; - for(size_t i = 0; i < dbrRep.getSize(); i++) { - allIndex[i] = *dbrRep.getIndex(i); - dataSize += allIndex[i].length; - } - for(size_t i = 0; i < dbrClu.getSize(); i++) { - DBReader::Index * index = dbrClu.getIndex(i); - index->offset += dataSize; - allIndex[dbrRep.getSize() + i] = *index; - } - SORT_PARALLEL(allIndex.begin(), allIndex.end(), DBReader::Index::compareById); - char buffer[1024]; - for(size_t i = 0; i < allIndex.size(); i++){ - size_t len = DBWriter::indexToBuffer(buffer, allIndex[i].id, allIndex[i].offset, allIndex[i].length); - size_t written = fwrite(buffer, sizeof(char), len, sIndex); - if(written != len){ - Debug(Debug::ERROR) << "Cannot write index file " << seqsDbSeqIdxTmp << "\n"; - EXIT(EXIT_FAILURE); - } - } - if (fclose(sIndex) != 0) { - Debug(Debug::ERROR) << "Cannot close index file " << seqsDbSeqIdxTmp << "\n"; - EXIT(EXIT_FAILURE); - } - FileUtil::move(seqsDbSeqIdxTmp.c_str(), seqsDbSeqIdx.c_str()); - FileUtil::symlinkAlias(par.db3, seqsDbSeq + ".0" ); - FileUtil::move(seqsDbSeq.c_str(), (seqsDbSeq + ".1").c_str()); - - if(Parameters::isEqualDbtype(reader.getDbtype(), Parameters::DBTYPE_AMINO_ACIDS) || - Parameters::isEqualDbtype(reader.getDbtype(), Parameters::DBTYPE_HMM_PROFILE) || - Parameters::isEqualDbtype(reader.getDbtype(), Parameters::DBTYPE_NUCLEOTIDES)) { - - struct DBSuffix { - DBFiles::Files flag; - const char *suffix; - }; - - const DBSuffix suffices[] = { - {DBFiles::HEADER, "_h"}, - {DBFiles::HEADER_INDEX, "_h.index"}, - {DBFiles::HEADER_DBTYPE, "_h.dbtype"}, - {DBFiles::LOOKUP, ".lookup"}, - {DBFiles::SOURCE, ".source"}, - {DBFiles::TAX_MAPPING, "_mapping"}, - {DBFiles::TAX_NAMES, "_names.dmp"}, - {DBFiles::TAX_NODES, "_nodes.dmp"}, - {DBFiles::TAX_MERGED, "_merged.dmp"}, - {DBFiles::TAX_MERGED, "_taxonomy"}, - }; - - for (size_t i = 0; i < ARRAY_SIZE(suffices); ++i) { - std::string file = par.db1 + suffices[i].suffix; - if (suffices[i].flag && FileUtil::fileExists(file.c_str())) { - DBReader::copyDb(file, par.db3 + suffices[i].suffix); - } - } - for (size_t i = 0; i < ARRAY_SIZE(suffices); ++i) { - std::string file = par.db3 + suffices[i].suffix; - if (suffices[i].flag && FileUtil::fileExists(file.c_str())) { - DBReader::aliasDb(file, par.db3 + "_seq" + suffices[i].suffix); - } - } - } - return EXIT_SUCCESS; -}