diff --git a/core/checkpoint/CheckPointManager.cpp b/core/checkpoint/CheckPointManager.cpp index 5c8353ca92..e29d19b6be 100644 --- a/core/checkpoint/CheckPointManager.cpp +++ b/core/checkpoint/CheckPointManager.cpp @@ -13,19 +13,22 @@ // limitations under the License. #include "CheckPointManager.h" -#include + +#include + #include +#include #include -#include -#include "monitor/LogtailAlarm.h" + #include "app_config/AppConfig.h" -#include "config_manager/ConfigManager.h" +#include "common/FileSystemUtil.h" #include "common/Flags.h" #include "common/HashUtil.h" #include "common/StringTools.h" -#include "common/FileSystemUtil.h" -#include "logger/Logger.h" +#include "config_manager/ConfigManager.h" #include "file_server/FileDiscoveryOptions.h" +#include "logger/Logger.h" +#include "monitor/LogtailAlarm.h" using namespace std; #if defined(__linux__) @@ -200,6 +203,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) { int32_t fileOpenFlag = 0; // default, we close file ptr int32_t containerStopped = 0; int32_t lastForceRead = 0; + int32_t idxInReaderArray = LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY; if (meta.isMember("real_file_name")) { realFilePath = meta["real_file_name"].asString(); } @@ -237,6 +241,9 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) { if (meta.isMember("last_force_read")) { lastForceRead = meta["last_force_read"].asInt(); } + if (meta.isMember("idx_in_reader_array")) { + idxInReaderArray = meta["idx_in_reader_array"].asInt(); + } // can not get file's dev inode if (!devInode.IsValid()) { LOG_WARNING(sLogger, ("can not find check point dev inode, discard it", filePath)); @@ -258,6 +265,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) { containerStopped != 0, lastForceRead != 0); ptr->mLastUpdateTime = update_time; + ptr->mIdxInReaderArray = idxInReaderArray; AddCheckPoint(ptr); } else { // find config @@ -290,6 +298,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) { containerStopped != 0, lastForceRead != 0); ptr->mLastUpdateTime = update_time; + ptr->mIdxInReaderArray = idxInReaderArray; AddCheckPoint(ptr); } } @@ -336,6 +345,7 @@ bool CheckPointManager::DumpCheckPointToLocal() { leaf["config_name"] = Json::Value(checkPointPtr->mConfigName); // forward compatible leaf["sig"] = Json::Value(string("")); + leaf["idx_in_reader_array"] = Json::Value(checkPointPtr->mIdxInReaderArray); // use filename + dev + inode + configName to prevent same filename conflict root[checkPointPtr->mFileName + "*" + ToString(checkPointPtr->mDevInode.dev) + "*" + ToString(checkPointPtr->mDevInode.inode) + "*" + checkPointPtr->mConfigName] @@ -365,6 +375,7 @@ bool CheckPointManager::DumpCheckPointToLocal() { leaf["config_name"] = Json::Value(checkPointPtr->mConfigName); // forward compatible leaf["sig"] = Json::Value(string("")); + leaf["idx_in_reader_array"] = Json::Value(checkPointPtr->mIdxInReaderArray); // use filename + dev + inode + configName to prevent same filename conflict root[checkPointPtr->mFileName + "*" + ToString(checkPointPtr->mDevInode.dev) + "*" + ToString(checkPointPtr->mDevInode.inode) + "*" + checkPointPtr->mConfigName] diff --git a/core/checkpoint/CheckPointManager.h b/core/checkpoint/CheckPointManager.h index d6aef98a8c..21f98d174e 100644 --- a/core/checkpoint/CheckPointManager.h +++ b/core/checkpoint/CheckPointManager.h @@ -27,6 +27,7 @@ #include "common/DevInode.h" #include "common/EncodingConverter.h" #include "common/SplitedFilePath.h" +#include "reader/LogFileReader.h" #ifdef APSARA_UNIT_TEST_MAIN #include "AppConfig.h" @@ -49,7 +50,7 @@ class CheckPoint { std::string mConfigName; std::string mFileName; std::string mRealFileName; - int32_t mPositionInReaderArray = -1; // default not in the reader queue + int32_t mIdxInReaderArray = LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY; CheckPoint() {} diff --git a/core/event_handler/EventHandler.cpp b/core/event_handler/EventHandler.cpp index 2f938bb9d9..21a4c063db 100644 --- a/core/event_handler/EventHandler.cpp +++ b/core/event_handler/EventHandler.cpp @@ -460,22 +460,19 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path, } int32_t idx = readerPtr->GetIdxInReaderArrayFromLastCpt(); - // new reader - if (backFlag) { + if (backFlag) { // new reader readerArray.push_back(readerPtr); mDevInodeReaderMap[devInode] = readerPtr; - // reader not in reader array - } else if (idx == LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY) { + } else if (idx == LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY) { // reader not in reader array mRotatorReaderMap[devInode] = readerPtr; - // reader in reader array - } else if (idx >= 0) { + } else if (idx >= 0) { // reader in reader array readerArray.push_back(readerPtr); mDevInodeReaderMap[devInode] = readerPtr; std::stable_sort(readerArray.begin(), readerArray.end(), ModifyHandler::CompareReaderByIdxFromCpt); } else { - LOG_ERROR(sLogger, - ("unexpected idx", idx)("real log path", readerPtr->GetRealLogPath())("host log path", - readerPtr->GetHostLogPath())); + LOG_WARNING(sLogger, + ("unexpected idx (perhaps because first checkpoint load after upgrade)", + idx)("real log path", readerPtr->GetRealLogPath())("host log path", readerPtr->GetHostLogPath())); return LogFileReaderPtr(); } readerPtr->SetReaderArray(&readerArray); @@ -975,7 +972,7 @@ void ModifyHandler::HandleTimeOut() { bool ModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) { if (!isRotatorReader) { for (DevInodeLogFileReaderMap::iterator it = mDevInodeReaderMap.begin(); it != mDevInodeReaderMap.end(); ++it) { - int32_t idxInReaderArray = -2; + int32_t idxInReaderArray = LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY; for (size_t i = 0; i < it->second->GetReaderArray()->size(); ++i) { if (it->second->GetReaderArray()->at(i) == it->second) { idxInReaderArray = i; @@ -986,7 +983,7 @@ bool ModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) { } } else { for (DevInodeLogFileReaderMap::iterator it = mRotatorReaderMap.begin(); it != mRotatorReaderMap.end(); ++it) { - it->second->DumpMetaToMem(checkConfigFlag, -2); + it->second->DumpMetaToMem(checkConfigFlag, LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY); } } return true; diff --git a/core/reader/LogFileReader.cpp b/core/reader/LogFileReader.cpp index 9836f2d8ff..44c19f5601 100644 --- a/core/reader/LogFileReader.cpp +++ b/core/reader/LogFileReader.cpp @@ -232,7 +232,7 @@ void LogFileReader::DumpMetaToMem(bool checkConfigFlag, int32_t idxInReaderArray // use last event time as checkpoint's last update time checkPointPtr->mLastUpdateTime = mLastEventTime; checkPointPtr->mCache = mCache; - checkPointPtr->mPositionInReaderArray = idxInReaderArray; + checkPointPtr->mIdxInReaderArray = idxInReaderArray; CheckPointManager::Instance()->AddCheckPoint(checkPointPtr); } @@ -283,7 +283,7 @@ void LogFileReader::InitReader(bool tailExisted, FileReadPolicy policy, uint32_t mLastEventTime = checkPointPtr->mLastUpdateTime; mContainerStopped = checkPointPtr->mContainerStopped; // new property to recover reader exactly from checkpoint - mIdxInReaderArrayFromLastCpt = checkPointPtr->mPositionInReaderArray; + mIdxInReaderArrayFromLastCpt = checkPointPtr->mIdxInReaderArray; LOG_INFO(sLogger, ("recover log reader status from checkpoint, project", GetProject())("logstore", GetLogstore())( "config", GetConfigName())("log reader queue name", mHostLogPath)("file device", diff --git a/core/reader/LogFileReader.h b/core/reader/LogFileReader.h index 26aa345a5c..0dc34371b3 100644 --- a/core/reader/LogFileReader.h +++ b/core/reader/LogFileReader.h @@ -187,7 +187,7 @@ class LogFileReader { void InitReader(bool tailExisted = false, FileReadPolicy policy = BACKWARD_TO_FIXED_POS, uint32_t eoConcurrency = 0); - void DumpMetaToMem(bool checkConfigFlag = false, int32_t idxInReaderArray = -1); + void DumpMetaToMem(bool checkConfigFlag = false, int32_t idxInReaderArray = CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY); std::string GetSourceId() { return mSourceId; }