Skip to content

Commit ef1b94a

Browse files
committed
fix checkpoint dump idx in reader array
1 parent 0fae8b4 commit ef1b94a

File tree

5 files changed

+50
-21
lines changed

5 files changed

+50
-21
lines changed

core/checkpoint/CheckPointManager.cpp

+17-6
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,22 @@
1313
// limitations under the License.
1414

1515
#include "CheckPointManager.h"
16-
#include <string>
16+
17+
#include <fcntl.h>
18+
1719
#include <fstream>
20+
#include <string>
1821
#include <thread>
19-
#include <fcntl.h>
20-
#include "monitor/LogtailAlarm.h"
22+
2123
#include "app_config/AppConfig.h"
22-
#include "config_manager/ConfigManager.h"
24+
#include "common/FileSystemUtil.h"
2325
#include "common/Flags.h"
2426
#include "common/HashUtil.h"
2527
#include "common/StringTools.h"
26-
#include "common/FileSystemUtil.h"
27-
#include "logger/Logger.h"
28+
#include "config_manager/ConfigManager.h"
2829
#include "file_server/FileDiscoveryOptions.h"
30+
#include "logger/Logger.h"
31+
#include "monitor/LogtailAlarm.h"
2932

3033
using namespace std;
3134
#if defined(__linux__)
@@ -200,6 +203,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
200203
int32_t fileOpenFlag = 0; // default, we close file ptr
201204
int32_t containerStopped = 0;
202205
int32_t lastForceRead = 0;
206+
int32_t idxInReaderArray = LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY;
203207
if (meta.isMember("real_file_name")) {
204208
realFilePath = meta["real_file_name"].asString();
205209
}
@@ -237,6 +241,9 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
237241
if (meta.isMember("last_force_read")) {
238242
lastForceRead = meta["last_force_read"].asInt();
239243
}
244+
if (meta.isMember("idx_in_reader_array")) {
245+
idxInReaderArray = meta["idx_in_reader_array"].asInt();
246+
}
240247
// can not get file's dev inode
241248
if (!devInode.IsValid()) {
242249
LOG_WARNING(sLogger, ("can not find check point dev inode, discard it", filePath));
@@ -258,6 +265,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
258265
containerStopped != 0,
259266
lastForceRead != 0);
260267
ptr->mLastUpdateTime = update_time;
268+
ptr->mIdxInReaderArray = idxInReaderArray;
261269
AddCheckPoint(ptr);
262270
} else {
263271
// find config
@@ -290,6 +298,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
290298
containerStopped != 0,
291299
lastForceRead != 0);
292300
ptr->mLastUpdateTime = update_time;
301+
ptr->mIdxInReaderArray = idxInReaderArray;
293302
AddCheckPoint(ptr);
294303
}
295304
}
@@ -336,6 +345,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
336345
leaf["config_name"] = Json::Value(checkPointPtr->mConfigName);
337346
// forward compatible
338347
leaf["sig"] = Json::Value(string(""));
348+
leaf["idx_in_reader_array"] = Json::Value(checkPointPtr->mIdxInReaderArray);
339349
// use filename + dev + inode + configName to prevent same filename conflict
340350
root[checkPointPtr->mFileName + "*" + ToString(checkPointPtr->mDevInode.dev) + "*"
341351
+ ToString(checkPointPtr->mDevInode.inode) + "*" + checkPointPtr->mConfigName]
@@ -365,6 +375,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
365375
leaf["config_name"] = Json::Value(checkPointPtr->mConfigName);
366376
// forward compatible
367377
leaf["sig"] = Json::Value(string(""));
378+
leaf["idx_in_reader_array"] = Json::Value(checkPointPtr->mIdxInReaderArray);
368379
// use filename + dev + inode + configName to prevent same filename conflict
369380
root[checkPointPtr->mFileName + "*" + ToString(checkPointPtr->mDevInode.dev) + "*"
370381
+ ToString(checkPointPtr->mDevInode.inode) + "*" + checkPointPtr->mConfigName]

core/checkpoint/CheckPointManager.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "common/DevInode.h"
2828
#include "common/EncodingConverter.h"
2929
#include "common/SplitedFilePath.h"
30+
#include "reader/LogFileReader.h"
3031

3132
#ifdef APSARA_UNIT_TEST_MAIN
3233
#include "AppConfig.h"
@@ -49,7 +50,7 @@ class CheckPoint {
4950
std::string mConfigName;
5051
std::string mFileName;
5152
std::string mRealFileName;
52-
int32_t mPositionInReaderArray = -1; // default not in the reader queue
53+
int32_t mIdxInReaderArray = LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY;
5354

5455
CheckPoint() {}
5556

core/event_handler/EventHandler.cpp

+8-11
Original file line numberDiff line numberDiff line change
@@ -460,22 +460,19 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,
460460
}
461461

462462
int32_t idx = readerPtr->GetIdxInReaderArrayFromLastCpt();
463-
// new reader
464-
if (backFlag) {
463+
if (backFlag) { // new reader
465464
readerArray.push_back(readerPtr);
466465
mDevInodeReaderMap[devInode] = readerPtr;
467-
// reader not in reader array
468-
} else if (idx == LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY) {
466+
} else if (idx == LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY) { // reader not in reader array
469467
mRotatorReaderMap[devInode] = readerPtr;
470-
// reader in reader array
471-
} else if (idx >= 0) {
468+
} else if (idx >= 0) { // reader in reader array
472469
readerArray.push_back(readerPtr);
473470
mDevInodeReaderMap[devInode] = readerPtr;
474471
std::stable_sort(readerArray.begin(), readerArray.end(), ModifyHandler::CompareReaderByIdxFromCpt);
475472
} else {
476-
LOG_ERROR(sLogger,
477-
("unexpected idx", idx)("real log path", readerPtr->GetRealLogPath())("host log path",
478-
readerPtr->GetHostLogPath()));
473+
LOG_WARNING(sLogger,
474+
("unexpected idx (perhaps because first checkpoint load after upgrade)",
475+
idx)("real log path", readerPtr->GetRealLogPath())("host log path", readerPtr->GetHostLogPath()));
479476
return LogFileReaderPtr();
480477
}
481478
readerPtr->SetReaderArray(&readerArray);
@@ -975,7 +972,7 @@ void ModifyHandler::HandleTimeOut() {
975972
bool ModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) {
976973
if (!isRotatorReader) {
977974
for (DevInodeLogFileReaderMap::iterator it = mDevInodeReaderMap.begin(); it != mDevInodeReaderMap.end(); ++it) {
978-
int32_t idxInReaderArray = -2;
975+
int32_t idxInReaderArray = LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY;
979976
for (size_t i = 0; i < it->second->GetReaderArray()->size(); ++i) {
980977
if (it->second->GetReaderArray()->at(i) == it->second) {
981978
idxInReaderArray = i;
@@ -986,7 +983,7 @@ bool ModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) {
986983
}
987984
} else {
988985
for (DevInodeLogFileReaderMap::iterator it = mRotatorReaderMap.begin(); it != mRotatorReaderMap.end(); ++it) {
989-
it->second->DumpMetaToMem(checkConfigFlag, -2);
986+
it->second->DumpMetaToMem(checkConfigFlag, LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY);
990987
}
991988
}
992989
return true;

core/reader/LogFileReader.cpp

+22-2
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,26 @@ LogFileReader::LogFileReader(const std::string& hostLogPathDir,
190190
mRegion = readerConfig.second->GetRegion();
191191
}
192192

193+
void LogFileReader::SetMetrics() {
194+
mMetricLabels = {{METRIC_LABEL_FILE_NAME, GetConvertedPath()},
195+
{METRIC_LABEL_FILE_DEV, std::to_string(GetDevInode().dev)},
196+
{METRIC_LABEL_FILE_INODE, std::to_string(GetDevInode().inode)}};
197+
mMetricsRecordRef = FileServer::GetInstance()->GetOrCreateReentrantMetricsRecordRef(GetConfigName(), mMetricLabels);
198+
if (mMetricsRecordRef == nullptr) {
199+
LOG_ERROR(sLogger,
200+
("failed to init metrics", "cannot get config's metricRecordRef")("config name", GetConfigName()));
201+
mMetricsEnabled = false;
202+
return;
203+
}
204+
205+
mMetricsEnabled = true;
206+
207+
mInputRecordsSizeBytesCounter = mMetricsRecordRef->GetCounter(METRIC_INPUT_RECORDS_SIZE_BYTES);
208+
mInputReadTotalCounter = mMetricsRecordRef->GetCounter(METRIC_INPUT_READ_TOTAL);
209+
mInputFileSizeBytesGauge = mMetricsRecordRef->GetGauge(METRIC_INPUT_FILE_SIZE_BYTES);
210+
mInputFileOffsetBytesGauge = mMetricsRecordRef->GetGauge(METRIC_INPUT_FILE_OFFSET_BYTES);
211+
}
212+
193213
void LogFileReader::DumpMetaToMem(bool checkConfigFlag, int32_t idxInReaderArray) {
194214
if (checkConfigFlag) {
195215
size_t index = mHostLogPath.rfind(PATH_SEPARATOR);
@@ -232,7 +252,7 @@ void LogFileReader::DumpMetaToMem(bool checkConfigFlag, int32_t idxInReaderArray
232252
// use last event time as checkpoint's last update time
233253
checkPointPtr->mLastUpdateTime = mLastEventTime;
234254
checkPointPtr->mCache = mCache;
235-
checkPointPtr->mPositionInReaderArray = idxInReaderArray;
255+
checkPointPtr->mIdxInReaderArray = idxInReaderArray;
236256
CheckPointManager::Instance()->AddCheckPoint(checkPointPtr);
237257
}
238258

@@ -283,7 +303,7 @@ void LogFileReader::InitReader(bool tailExisted, FileReadPolicy policy, uint32_t
283303
mLastEventTime = checkPointPtr->mLastUpdateTime;
284304
mContainerStopped = checkPointPtr->mContainerStopped;
285305
// new property to recover reader exactly from checkpoint
286-
mIdxInReaderArrayFromLastCpt = checkPointPtr->mPositionInReaderArray;
306+
mIdxInReaderArrayFromLastCpt = checkPointPtr->mIdxInReaderArray;
287307
LOG_INFO(sLogger,
288308
("recover log reader status from checkpoint, project", GetProject())("logstore", GetLogstore())(
289309
"config", GetConfigName())("log reader queue name", mHostLogPath)("file device",

core/reader/LogFileReader.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ class LogFileReader {
187187
void
188188
InitReader(bool tailExisted = false, FileReadPolicy policy = BACKWARD_TO_FIXED_POS, uint32_t eoConcurrency = 0);
189189

190-
void DumpMetaToMem(bool checkConfigFlag = false, int32_t idxInReaderArray = -1);
190+
void DumpMetaToMem(bool checkConfigFlag = false, int32_t idxInReaderArray = CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY);
191191

192192
std::string GetSourceId() { return mSourceId; }
193193

0 commit comments

Comments
 (0)