Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport] fix checkpoint dump idx in reader array #1638

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions core/checkpoint/CheckPointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,22 @@
// limitations under the License.

#include "CheckPointManager.h"
#include <string>

#include <fcntl.h>

#include <fstream>
#include <string>
#include <thread>
#include <fcntl.h>
#include "monitor/LogtailAlarm.h"

#include "app_config/AppConfig.h"
#include "config_manager/ConfigManager.h"
#include "common/FileSystemUtil.h"
#include "common/Flags.h"
#include "common/HashUtil.h"
#include "common/StringTools.h"
#include "common/FileSystemUtil.h"
#include "logger/Logger.h"
#include "config_manager/ConfigManager.h"
#include "file_server/FileDiscoveryOptions.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"

using namespace std;
#if defined(__linux__)
Expand Down Expand Up @@ -200,6 +203,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
int32_t fileOpenFlag = 0; // default, we close file ptr
int32_t containerStopped = 0;
int32_t lastForceRead = 0;
int32_t idxInReaderArray = LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY;
if (meta.isMember("real_file_name")) {
realFilePath = meta["real_file_name"].asString();
}
Expand Down Expand Up @@ -237,6 +241,9 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
if (meta.isMember("last_force_read")) {
lastForceRead = meta["last_force_read"].asInt();
}
if (meta.isMember("idx_in_reader_array")) {
idxInReaderArray = meta["idx_in_reader_array"].asInt();
}
// can not get file's dev inode
if (!devInode.IsValid()) {
LOG_WARNING(sLogger, ("can not find check point dev inode, discard it", filePath));
Expand All @@ -258,6 +265,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
containerStopped != 0,
lastForceRead != 0);
ptr->mLastUpdateTime = update_time;
ptr->mIdxInReaderArray = idxInReaderArray;
AddCheckPoint(ptr);
} else {
// find config
Expand Down Expand Up @@ -290,6 +298,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
containerStopped != 0,
lastForceRead != 0);
ptr->mLastUpdateTime = update_time;
ptr->mIdxInReaderArray = idxInReaderArray;
AddCheckPoint(ptr);
}
}
Expand Down Expand Up @@ -336,6 +345,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
leaf["config_name"] = Json::Value(checkPointPtr->mConfigName);
// forward compatible
leaf["sig"] = Json::Value(string(""));
leaf["idx_in_reader_array"] = Json::Value(checkPointPtr->mIdxInReaderArray);
// use filename + dev + inode + configName to prevent same filename conflict
root[checkPointPtr->mFileName + "*" + ToString(checkPointPtr->mDevInode.dev) + "*"
+ ToString(checkPointPtr->mDevInode.inode) + "*" + checkPointPtr->mConfigName]
Expand Down Expand Up @@ -365,6 +375,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
leaf["config_name"] = Json::Value(checkPointPtr->mConfigName);
// forward compatible
leaf["sig"] = Json::Value(string(""));
leaf["idx_in_reader_array"] = Json::Value(checkPointPtr->mIdxInReaderArray);
// use filename + dev + inode + configName to prevent same filename conflict
root[checkPointPtr->mFileName + "*" + ToString(checkPointPtr->mDevInode.dev) + "*"
+ ToString(checkPointPtr->mDevInode.inode) + "*" + checkPointPtr->mConfigName]
Expand Down
3 changes: 2 additions & 1 deletion core/checkpoint/CheckPointManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "common/DevInode.h"
#include "common/EncodingConverter.h"
#include "common/SplitedFilePath.h"
#include "reader/LogFileReader.h"

#ifdef APSARA_UNIT_TEST_MAIN
#include "AppConfig.h"
Expand All @@ -49,7 +50,7 @@ class CheckPoint {
std::string mConfigName;
std::string mFileName;
std::string mRealFileName;
int32_t mPositionInReaderArray = -1; // default not in the reader queue
int32_t mIdxInReaderArray = LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY;

CheckPoint() {}

Expand Down
19 changes: 8 additions & 11 deletions core/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,22 +460,19 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,
}

int32_t idx = readerPtr->GetIdxInReaderArrayFromLastCpt();
// new reader
if (backFlag) {
if (backFlag) { // new reader
readerArray.push_back(readerPtr);
mDevInodeReaderMap[devInode] = readerPtr;
// reader not in reader array
} else if (idx == LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY) {
} else if (idx == LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY) { // reader not in reader array
mRotatorReaderMap[devInode] = readerPtr;
// reader in reader array
} else if (idx >= 0) {
} else if (idx >= 0) { // reader in reader array
readerArray.push_back(readerPtr);
mDevInodeReaderMap[devInode] = readerPtr;
std::stable_sort(readerArray.begin(), readerArray.end(), ModifyHandler::CompareReaderByIdxFromCpt);
} else {
LOG_ERROR(sLogger,
("unexpected idx", idx)("real log path", readerPtr->GetRealLogPath())("host log path",
readerPtr->GetHostLogPath()));
LOG_WARNING(sLogger,
("unexpected idx (perhaps because first checkpoint load after upgrade)",
idx)("real log path", readerPtr->GetRealLogPath())("host log path", readerPtr->GetHostLogPath()));
return LogFileReaderPtr();
}
readerPtr->SetReaderArray(&readerArray);
Expand Down Expand Up @@ -975,7 +972,7 @@ void ModifyHandler::HandleTimeOut() {
bool ModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) {
if (!isRotatorReader) {
for (DevInodeLogFileReaderMap::iterator it = mDevInodeReaderMap.begin(); it != mDevInodeReaderMap.end(); ++it) {
int32_t idxInReaderArray = -2;
int32_t idxInReaderArray = LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY;
for (size_t i = 0; i < it->second->GetReaderArray()->size(); ++i) {
if (it->second->GetReaderArray()->at(i) == it->second) {
idxInReaderArray = i;
Expand All @@ -986,7 +983,7 @@ bool ModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) {
}
} else {
for (DevInodeLogFileReaderMap::iterator it = mRotatorReaderMap.begin(); it != mRotatorReaderMap.end(); ++it) {
it->second->DumpMetaToMem(checkConfigFlag, -2);
it->second->DumpMetaToMem(checkConfigFlag, LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY);
}
}
return true;
Expand Down
4 changes: 2 additions & 2 deletions core/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void LogFileReader::DumpMetaToMem(bool checkConfigFlag, int32_t idxInReaderArray
// use last event time as checkpoint's last update time
checkPointPtr->mLastUpdateTime = mLastEventTime;
checkPointPtr->mCache = mCache;
checkPointPtr->mPositionInReaderArray = idxInReaderArray;
checkPointPtr->mIdxInReaderArray = idxInReaderArray;
CheckPointManager::Instance()->AddCheckPoint(checkPointPtr);
}

Expand Down Expand Up @@ -283,7 +283,7 @@ void LogFileReader::InitReader(bool tailExisted, FileReadPolicy policy, uint32_t
mLastEventTime = checkPointPtr->mLastUpdateTime;
mContainerStopped = checkPointPtr->mContainerStopped;
// new property to recover reader exactly from checkpoint
mIdxInReaderArrayFromLastCpt = checkPointPtr->mPositionInReaderArray;
mIdxInReaderArrayFromLastCpt = checkPointPtr->mIdxInReaderArray;
LOG_INFO(sLogger,
("recover log reader status from checkpoint, project", GetProject())("logstore", GetLogstore())(
"config", GetConfigName())("log reader queue name", mHostLogPath)("file device",
Expand Down
2 changes: 1 addition & 1 deletion core/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class LogFileReader {
void
InitReader(bool tailExisted = false, FileReadPolicy policy = BACKWARD_TO_FIXED_POS, uint32_t eoConcurrency = 0);

void DumpMetaToMem(bool checkConfigFlag = false, int32_t idxInReaderArray = -1);
void DumpMetaToMem(bool checkConfigFlag = false, int32_t idxInReaderArray = CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY);

std::string GetSourceId() { return mSourceId; }

Expand Down
Loading