Skip to content

[backport] recover readers exactly from checkpoint #1635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions core/checkpoint/CheckPointManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/

#pragma once
#include <string>
#include <memory>
#include <unordered_map>
#include <set>
#include <ctime>
#include <json/json.h>

#include <boost/optional.hpp>
#include <ctime>
#include <memory>
#include <set>
#include <string>
#include <unordered_map>

#include "common/DevInode.h"
#include "common/EncodingConverter.h"
#include "common/SplitedFilePath.h"
Expand All @@ -47,6 +49,7 @@ class CheckPoint {
std::string mConfigName;
std::string mFileName;
std::string mRealFileName;
int32_t mPositionInReaderArray = -1; // default not in the reader queue

CheckPoint() {}

Expand Down
58 changes: 42 additions & 16 deletions core/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,20 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,
"new log reader queue count", mNameReaderMap.size() + 1));
}
LogFileReaderPtrArray& readerArray = mNameReaderMap[name];
if (readerArray.size() >= readerConfig.first->mRotatorQueueSize) {

LogFileReaderPtr readerPtr(LogFileReader::CreateLogFileReader(path,
name,
devInode,
readerConfig,
multilineConfig,
discoveryConfig,
exactlyonceConcurrency,
forceBeginingFlag));
if (readerPtr.get() == NULL)
return LogFileReaderPtr();

if (readerArray.size() >= readerConfig.first->mRotatorQueueSize
&& readerPtr->GetIdxInReaderArrayFromLastCpt() == LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY) {
int32_t nowTime = time(NULL);
if (nowTime - mLastOverflowErrorTime > INT32_FLAG(rotate_overflow_error_interval)) {
mLastOverflowErrorTime = nowTime;
Expand Down Expand Up @@ -400,17 +413,6 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,
"config", readerConfig.second->GetConfigName())("log reader queue name", PathJoin(path, name))(
"file device", ToString(devInode.dev))("file inode", ToString(devInode.inode)));

LogFileReaderPtr readerPtr(LogFileReader::CreateLogFileReader(path,
name,
devInode,
readerConfig,
multilineConfig,
discoveryConfig,
exactlyonceConcurrency,
forceBeginingFlag));
if (readerPtr.get() == NULL)
return LogFileReaderPtr();

// new log
bool backFlag = false;
if (readerPtr->GetRealLogPath().empty() || readerPtr->GetRealLogPath() == readerPtr->GetHostLogPath()) {
Expand Down Expand Up @@ -457,9 +459,26 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,
}
}

backFlag ? readerArray.push_back(readerPtr) : readerArray.push_front(readerPtr);
int32_t idx = readerPtr->GetIdxInReaderArrayFromLastCpt();
// new reader
if (backFlag) {
readerArray.push_back(readerPtr);
mDevInodeReaderMap[devInode] = readerPtr;
// reader not in reader array
} else if (idx == LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY) {
mRotatorReaderMap[devInode] = readerPtr;
// reader in reader array
} else if (idx >= 0) {
readerArray.push_back(readerPtr);
mDevInodeReaderMap[devInode] = readerPtr;
std::stable_sort(readerArray.begin(), readerArray.end(), ModifyHandler::CompareReaderByIdxFromCpt);
} else {
LOG_ERROR(sLogger,
("unexpected idx", idx)("real log path", readerPtr->GetRealLogPath())("host log path",
readerPtr->GetHostLogPath()));
return LogFileReaderPtr();
}
readerPtr->SetReaderArray(&readerArray);
mDevInodeReaderMap[devInode] = readerPtr;

LOG_INFO(sLogger,
("log reader creation succeed",
Expand Down Expand Up @@ -956,11 +975,18 @@ void ModifyHandler::HandleTimeOut() {
bool ModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) {
if (!isRotatorReader) {
for (DevInodeLogFileReaderMap::iterator it = mDevInodeReaderMap.begin(); it != mDevInodeReaderMap.end(); ++it) {
it->second->DumpMetaToMem(checkConfigFlag);
int32_t idxInReaderArray = -2;
for (size_t i = 0; i < it->second->GetReaderArray()->size(); ++i) {
if (it->second->GetReaderArray()->at(i) == it->second) {
idxInReaderArray = i;
break;
}
}
it->second->DumpMetaToMem(checkConfigFlag, idxInReaderArray);
}
} else {
for (DevInodeLogFileReaderMap::iterator it = mRotatorReaderMap.begin(); it != mRotatorReaderMap.end(); ++it) {
it->second->DumpMetaToMem(checkConfigFlag);
it->second->DumpMetaToMem(checkConfigFlag, -2);
}
}
return true;
Expand Down
15 changes: 15 additions & 0 deletions core/event_handler/EventHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ class ModifyHandler : public EventHandler {
return left->GetLastUpdateTime() < right->GetLastUpdateTime();
}

static bool CompareReaderByIdxFromCpt(const std::shared_ptr<LogFileReader> left,
const std::shared_ptr<LogFileReader> right) {
if (left->GetIdxInReaderArrayFromLastCpt() == right->GetIdxInReaderArrayFromLastCpt()) {
return false;
}
// new reader is always at the end of the array
if (left->GetIdxInReaderArrayFromLastCpt() == LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY) {
return false;
}
if (right->GetIdxInReaderArrayFromLastCpt() == LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY) {
return true;
}
return left->GetIdxInReaderArrayFromLastCpt() < right->GetIdxInReaderArrayFromLastCpt();
}

LogFileReaderPtr CreateLogFileReaderPtr(const std::string& path,
const std::string& name,
const DevInode& devInode,
Expand Down
14 changes: 9 additions & 5 deletions core/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ LogFileReader::LogFileReader(const std::string& hostLogPathDir,
mRegion = readerConfig.second->GetRegion();
}

void LogFileReader::DumpMetaToMem(bool checkConfigFlag) {
void LogFileReader::DumpMetaToMem(bool checkConfigFlag, int32_t idxInReaderArray) {
if (checkConfigFlag) {
size_t index = mHostLogPath.rfind(PATH_SEPARATOR);
if (index == string::npos || index == mHostLogPath.size() - 1) {
Expand Down Expand Up @@ -232,6 +232,7 @@ void LogFileReader::DumpMetaToMem(bool checkConfigFlag) {
// use last event time as checkpoint's last update time
checkPointPtr->mLastUpdateTime = mLastEventTime;
checkPointPtr->mCache = mCache;
checkPointPtr->mPositionInReaderArray = idxInReaderArray;
CheckPointManager::Instance()->AddCheckPoint(checkPointPtr);
}

Expand Down Expand Up @@ -281,12 +282,15 @@ void LogFileReader::InitReader(bool tailExisted, FileReadPolicy policy, uint32_t
mRealLogPath = checkPointPtr->mRealFileName;
mLastEventTime = checkPointPtr->mLastUpdateTime;
mContainerStopped = checkPointPtr->mContainerStopped;
// new property to recover reader exactly from checkpoint
mIdxInReaderArrayFromLastCpt = checkPointPtr->mPositionInReaderArray;
LOG_INFO(sLogger,
("recover log reader status from checkpoint, project", GetProject())("logstore", GetLogstore())(
"config", GetConfigName())("log reader queue name", mHostLogPath)(
"file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))(
"file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)(
"real file path", mRealLogPath)("last file position", mLastFilePos));
"config", GetConfigName())("log reader queue name", mHostLogPath)("file device",
ToString(mDevInode.dev))(
"file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)(
"file signature size", mLastFileSignatureSize)("real file path", mRealLogPath)(
"last file position", mLastFilePos)("index in reader array", mIdxInReaderArrayFromLastCpt));
// if file is open or
// last update time is new and the file's container is not stopped we
// we should use first modify
Expand Down
10 changes: 9 additions & 1 deletion core/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ typedef std::deque<LogFileReaderPtr> LogFileReaderPtrArray;
*/
class LogFileReader {
public:
static const int32_t CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY = -1;
static const int32_t CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY = -2;
enum FileCompareResult {
FileCompareResult_DevInodeChange,
FileCompareResult_SigChange,
Expand Down Expand Up @@ -162,6 +164,10 @@ class LogFileReader {

int64_t GetLastFilePos() const { return mLastFilePos; }

int32_t GetIdxInReaderArrayFromLastCpt() const { return mIdxInReaderArrayFromLastCpt; }

void SetIdxInReaderArrayFromLastCpt(int32_t idx) { mIdxInReaderArrayFromLastCpt = idx; }

void ResetLastFilePos() { mLastFilePos = 0; }

bool NeedSkipFirstModify() const { return mSkipFirstModify; }
Expand All @@ -181,7 +187,7 @@ class LogFileReader {
void
InitReader(bool tailExisted = false, FileReadPolicy policy = BACKWARD_TO_FIXED_POS, uint32_t eoConcurrency = 0);

void DumpMetaToMem(bool checkConfigFlag = false);
void DumpMetaToMem(bool checkConfigFlag = false, int32_t idxInReaderArray = -1);

std::string GetSourceId() { return mSourceId; }

Expand Down Expand Up @@ -396,6 +402,8 @@ class LogFileReader {
int64_t mLastFileSize = 0;
time_t mLastMTime = 0;
std::string mCache;
// >= 0: index of reader array, -1: new reader, -2: not in reader array
int32_t mIdxInReaderArrayFromLastCpt = CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY;
// std::string mProjectName;
std::string mTopicName;
time_t mLastUpdateTime;
Expand Down
12 changes: 7 additions & 5 deletions core/unittest/event_handler/ModifyHandlerUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "unittest/Unittest.h"
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <stdlib.h>
#include <string>

#include <memory>
#include "common/Flags.h"
#include <string>

#include "common/FileSystemUtil.h"
#include "common/Flags.h"
#include "event/Event.h"
#include "event_handler/EventHandler.h"
#include "reader/LogFileReader.h"
#include "unittest/Unittest.h"

using namespace std;

Expand Down Expand Up @@ -122,4 +124,4 @@ int main(int argc, char** argv) {
logtail::Logger::Instance().InitGlobalLoggers();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}
Loading