From 542f3621212087a339da3ad110b04c5a9b4ae892 Mon Sep 17 00:00:00 2001 From: Martin Steinegger Date: Mon, 31 Jul 2023 12:40:15 +0200 Subject: [PATCH] Add mkrepseqdb as command --- src/CommandDeclarations.h | 1 + src/MMseqsBase.cpp | 11 +++- src/util/CMakeLists.txt | 1 + src/util/mkrepseqdb.cpp | 135 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 145 insertions(+), 3 deletions(-) create mode 100644 src/util/mkrepseqdb.cpp diff --git a/src/CommandDeclarations.h b/src/CommandDeclarations.h index eed98e629..0083267ba 100644 --- a/src/CommandDeclarations.h +++ b/src/CommandDeclarations.h @@ -76,6 +76,7 @@ extern int mergedbs(int argc, const char **argv, const Command& command); extern int mergeresultsbyset(int argc, const char **argv, const Command &command); extern int msa2profile(int argc, const char **argv, const Command& command); extern int sequence2profile(int argc, const char **argv, const Command& command); +extern int mkrepseqdb(int argc, const char **argv, const Command& command); extern int msa2result(int argc, const char **argv, const Command& command); extern int multihitdb(int argc, const char **argv, const Command& command); extern int multihitsearch(int argc, const char **argv, const Command& command); diff --git a/src/MMseqsBase.cpp b/src/MMseqsBase.cpp index 67812d74a..a9b4db8ff 100644 --- a/src/MMseqsBase.cpp +++ b/src/MMseqsBase.cpp @@ -1245,9 +1245,14 @@ std::vector baseCommands = { "Martin Steinegger ", " ", CITATION_MMSEQS2, {{"sequenceDB", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA, &DbValidator::sequenceDb }}}, - - - + {"mkrepseqdb", mkrepseqdb, &par.threadsandcompression, COMMAND_HIDDEN, + "Seperates a sequence DB into a representative and a non-representative DB", + NULL, + "Martin Steinegger ", + " ", + CITATION_MMSEQS2, {{"sequenceDB", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA, &DbValidator::sequenceDb }, + {"resultDB", DbType::ACCESS_MODE_INPUT, DbType::NEED_DATA, &DbValidator::resultDb }, + {"sequenceDB", DbType::ACCESS_MODE_OUTPUT, DbType::NEED_DATA, &DbValidator::sequenceDb }}}, {"dbtype", dbtype, &par.empty, COMMAND_HIDDEN, "", NULL, diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 5adbbdc0c..7d4035e8d 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -36,6 +36,7 @@ set(util_source_files util/mergeclusters.cpp util/mergeresultsbyset.cpp util/mergedbs.cpp + util/mkrepseqdb.cpp util/msa2profile.cpp util/msa2result.cpp util/nrtotaxmapping.cpp diff --git a/src/util/mkrepseqdb.cpp b/src/util/mkrepseqdb.cpp new file mode 100644 index 000000000..0640f359f --- /dev/null +++ b/src/util/mkrepseqdb.cpp @@ -0,0 +1,135 @@ +#include "DBReader.h" +#include "DBWriter.h" +#include "Debug.h" +#include "Util.h" +#include "FastSort.h" +#include "Parameters.h" + +#ifdef OPENMP +#include +#endif + +int mkrepseqdb(int argc, const char **argv, const Command& command) { + Parameters &par = Parameters::getInstance(); + par.parseParameters(argc, argv, command, true, 0, MMseqsParameter::COMMAND_ALIGN); + + DBReader reader(par.db1.c_str(), par.db1Index.c_str(), par.threads, DBReader::USE_DATA|DBReader::USE_INDEX); + reader.open(DBReader::NOSORT); + reader.readMmapedDataInMemory(); + + DBReader clusterReader(par.db2.c_str(), par.db2Index.c_str(), par.threads, DBReader::USE_DATA|DBReader::USE_INDEX); + clusterReader.open(DBReader::NOSORT); + + DBWriter dbwRep(par.db3.c_str(), par.db3Index.c_str(), static_cast(par.threads), par.compressed, reader.getDbtype()); + dbwRep.open(); + std::string seqsDbSeq = par.db3 + "_seq"; + std::string seqsDbSeqIdx = par.db3 + "_seq.index"; + DBWriter dbwClu(seqsDbSeq.c_str(), seqsDbSeqIdx.c_str(), static_cast(par.threads), par.compressed, reader.getDbtype()); + dbwClu.open(); + Debug::Progress progress(clusterReader.getSize()); +#pragma omp parallel + { + unsigned int thread_idx = 0; +#ifdef OPENMP + thread_idx = static_cast(omp_get_thread_num()); +#endif + std::string resultBuffer; + // write output file +#pragma omp for schedule(dynamic, 1) + for (size_t id = 0; id < clusterReader.getSize(); id++) { + progress.updateProgress(); + char *data = clusterReader.getData(id, thread_idx); + size_t repKey = clusterReader.getDbKey(id); + size_t repDataId = reader.getId(repKey); + size_t repEntryLen = reader.getEntryLen(repDataId); + dbwRep.writeData(reader.getData(repDataId, thread_idx), repEntryLen - 1, repKey, thread_idx); + while(*data != '\0') { + // parse dbkey + size_t dbKey = Util::fast_atoi(data); + if(dbKey == repKey){ + data = Util::skipLine(data); + continue; + } + size_t readerId = reader.getId(dbKey); + dbwClu.writeData(reader.getData(readerId, thread_idx), + reader.getEntryLen(readerId) - 1, dbKey, thread_idx); + data = Util::skipLine(data); + } + resultBuffer.clear(); + } + } + dbwRep.close(true); + dbwClu.close(true); + clusterReader.close(); + reader.close(); + + // merge index + DBReader dbrRep(par.db3.c_str(), par.db3Index.c_str(), par.threads, DBReader::USE_INDEX); + dbrRep.open(DBReader::NOSORT); + DBReader dbrClu(seqsDbSeq.c_str(), seqsDbSeqIdx.c_str(), par.threads, DBReader::USE_INDEX); + dbrClu.open(DBReader::NOSORT); + std::string seqsDbSeqIdxTmp = seqsDbSeqIdx + "_tmp"; + + FILE *sIndex = FileUtil::openAndDelete(seqsDbSeqIdxTmp.c_str(), "w"); + std::vector::Index> allIndex(dbrClu.getSize() + dbrRep.getSize()); + size_t dataSize = 0; + for(size_t i = 0; i < dbrRep.getSize(); i++) { + allIndex[i] = *dbrRep.getIndex(i); + dataSize += allIndex[i].length; + } + for(size_t i = 0; i < dbrClu.getSize(); i++) { + DBReader::Index * index = dbrClu.getIndex(i); + index->offset += dataSize; + allIndex[dbrRep.getSize() + i] = *index; + } + SORT_PARALLEL(allIndex.begin(), allIndex.end(), DBReader::Index::compareById); + char buffer[1024]; + for(size_t i = 0; i < allIndex.size(); i++){ + size_t len = DBWriter::indexToBuffer(buffer, allIndex[i].id, allIndex[i].offset, allIndex[i].length); + size_t written = fwrite(buffer, sizeof(char), len, sIndex); + if(written != len){ + Debug(Debug::ERROR) << "Cannot write index file " << seqsDbSeqIdxTmp << "\n"; + EXIT(EXIT_FAILURE); + } + } + if (fclose(sIndex) != 0) { + Debug(Debug::ERROR) << "Cannot close index file " << seqsDbSeqIdxTmp << "\n"; + EXIT(EXIT_FAILURE); + } + FileUtil::move(seqsDbSeqIdxTmp.c_str(), seqsDbSeqIdx.c_str()); + FileUtil::symlinkAlias(par.db3, seqsDbSeq + ".0" ); + FileUtil::move(seqsDbSeq.c_str(), (seqsDbSeq + ".1").c_str()); + + struct DBSuffix { + DBFiles::Files flag; + const char* suffix; + }; + + const DBSuffix suffices[] = { + { DBFiles::HEADER, "_h" }, + { DBFiles::HEADER_INDEX, "_h.index" }, + { DBFiles::HEADER_DBTYPE, "_h.dbtype" }, + { DBFiles::LOOKUP, ".lookup" }, + { DBFiles::SOURCE, ".source" }, + { DBFiles::TAX_MAPPING, "_mapping" }, + { DBFiles::TAX_NAMES, "_names.dmp" }, + { DBFiles::TAX_NODES, "_nodes.dmp" }, + { DBFiles::TAX_MERGED, "_merged.dmp" }, + { DBFiles::TAX_MERGED, "_taxonomy" }, + }; + + for (size_t i = 0; i < ARRAY_SIZE(suffices); ++i) { + std::string file = par.db1 + suffices[i].suffix; + if (suffices[i].flag && FileUtil::fileExists(file.c_str())) { + DBReader::copyDb(file, par.db3 + suffices[i].suffix); + } + } + for (size_t i = 0; i < ARRAY_SIZE(suffices); ++i) { + std::string file = par.db3 + suffices[i].suffix; + if (suffices[i].flag && FileUtil::fileExists(file.c_str())) { + DBReader::aliasDb(file, par.db3 + "_seq" + suffices[i].suffix); + } + } + + return EXIT_SUCCESS; +}