Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor logic of last matched line #1426

Merged
merged 7 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/processor/ProcessorSplitMultilineLogStringNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ void ProcessorSplitMultilineLogStringNative::HandleUnmatchLogs(const StringView&
EventsContainer& newEvents,
StringView logPath,
int* unmatchLines) {
size_t begin, fisrtLogSize, totalLines = 0;
size_t begin = 0, fisrtLogSize = 0, totalLines = 0;
while (begin < sourceVal.size()) {
StringView content = GetNextLine(sourceVal, begin);
++(*unmatchLines);
Expand Down
6 changes: 4 additions & 2 deletions core/reader/JsonLogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
using namespace std;

namespace logtail {
int32_t
JsonLogFileReader::LastMatchedLine(char* buffer, int32_t size, int32_t& rollbackLineFeedCount, bool allowRollback) {
int32_t JsonLogFileReader::RemoveLastIncompleteLog(char* buffer,
int32_t size,
int32_t& rollbackLineFeedCount,
bool allowRollback) {
int32_t readBytes = 0;
int32_t endIdx = 0;
int32_t beginIdx = 0;
Expand Down
8 changes: 5 additions & 3 deletions core/reader/JsonLogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ class JsonLogFileReader : public LogFileReader {
: LogFileReader(hostLogPathDir, hostLogPathFile, devInode, readerConfig, multilineConfig) {}

protected:
int32_t
LastMatchedLine(char* buffer, int32_t size, int32_t& rollbackLineFeedCount, bool allowRollback = true) override;
int32_t RemoveLastIncompleteLog(char* buffer,
int32_t size,
int32_t& rollbackLineFeedCount,
bool allowRollback = true) override;

private:
bool FindJsonMatch(
Expand All @@ -40,7 +42,7 @@ class JsonLogFileReader : public LogFileReader {
#ifdef APSARA_UNIT_TEST_MAIN
friend class JsonLogFileReaderUnittest;
friend class JsonParseLogLineUnittest;
friend class LastMatchedLineUnittest;
friend class RemoveLastIncompleteLogUnittest;
#endif
};

Expand Down
125 changes: 63 additions & 62 deletions core/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1698,15 +1698,15 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,
}
if (allowRollback || mReaderConfig.second->RequiringJsonReader()) {
int32_t rollbackLineFeedCount;
nbytes = LastMatchedLine(stringBuffer, alignedBytes, rollbackLineFeedCount, allowRollback);
nbytes = RemoveLastIncompleteLog(stringBuffer, alignedBytes, rollbackLineFeedCount, allowRollback);
}

if (nbytes == 0) {
if (moreData) { // excessively long line without '\n' or multiline begin or valid wchar
nbytes = alignedBytes ? alignedBytes : BUFFER_SIZE;
if (mReaderConfig.second->RequiringJsonReader()) {
int32_t rollbackLineFeedCount;
nbytes = LastMatchedLine(stringBuffer, nbytes, rollbackLineFeedCount, false);
nbytes = RemoveLastIncompleteLog(stringBuffer, nbytes, rollbackLineFeedCount, false);
}
LOG_WARNING(sLogger,
("Log is too long and forced to be split at offset: ",
Expand Down Expand Up @@ -1856,15 +1856,15 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b
int32_t rollbackLineFeedCount = 0;
int32_t bakResultCharCount = resultCharCount;
if (allowRollback || mReaderConfig.second->RequiringJsonReader()) {
resultCharCount = LastMatchedLine(stringBuffer, resultCharCount, rollbackLineFeedCount, allowRollback);
resultCharCount = RemoveLastIncompleteLog(stringBuffer, resultCharCount, rollbackLineFeedCount, allowRollback);
}
if (resultCharCount == 0) {
if (moreData) {
resultCharCount = bakResultCharCount;
rollbackLineFeedCount = 0;
if (mReaderConfig.second->RequiringJsonReader()) {
int32_t rollbackLineFeedCount;
LastMatchedLine(stringBuffer, resultCharCount, rollbackLineFeedCount, false);
RemoveLastIncompleteLog(stringBuffer, resultCharCount, rollbackLineFeedCount, false);
}
// Cannot get the split position here, so just mark a flag and send alarm later
logTooLongSplitFlag = true;
Expand Down Expand Up @@ -2027,74 +2027,75 @@ LogFileReader::FileCompareResult LogFileReader::CompareToFile(const string& file
1. xxx\nend\n -> xxx\nend
1. xxx\nend\nxxx\n -> xxx\nend
*/
int32_t LogFileReader::LastMatchedLine(char* buffer, int32_t size, int32_t& rollbackLineFeedCount, bool allowRollback) {
/*
return: the number of bytes left, including \n
*/
int32_t
LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& rollbackLineFeedCount, bool allowRollback) {
if (!allowRollback) {
return size;
}
int endPs = size - 1; // buffer[size] = 0 , buffer[size-1] = '\n'
rollbackLineFeedCount = 0;
// Single line rollback
if (!mMultilineConfig.first->IsMultiline()) {
while (endPs >= 0) {
if (buffer[endPs] == '\n') {
if (endPs != size - 1) { // if last line dose not end with '\n', rollback
++rollbackLineFeedCount;
}
return endPs + 1;
}
endPs--;
}
return 0;
int32_t endPs; // the position of \n or \0
if (buffer[size - 1] == '\n') {
endPs = size - 1;
} else {
endPs = size;
}
rollbackLineFeedCount = 0;
// Multiline rollback
int begPs = size - 2;
std::string exception;
while (begPs >= 0) {
if (buffer[begPs] == '\n' || begPs == 0) {
int lineBegin = begPs == 0 ? 0 : begPs + 1;
if (mMultilineConfig.first->GetContinuePatternReg()
&& BoostRegexMatch(buffer + lineBegin,
endPs - lineBegin,
*mMultilineConfig.first->GetContinuePatternReg(),
exception)) {
++rollbackLineFeedCount;
endPs = begPs;
} else if (mMultilineConfig.first->GetEndPatternReg()
&& BoostRegexMatch(buffer + lineBegin,
endPs - lineBegin,
*mMultilineConfig.first->GetEndPatternReg(),
exception)) {
// Ensure the end line is complete
if (buffer[endPs] == '\n') {
return endPs + 1;
} else {
++rollbackLineFeedCount;
endPs = begPs;
if (mMultilineConfig.first->IsMultiline()) {
std::string exception;
while (endPs >= 0) {
StringView content = GetLastLine(StringView(buffer, size), endPs);
if (mMultilineConfig.first->GetEndPatternReg()) {
// start + end, continue + end, end
if (BoostRegexMatch(
content.data(), content.size(), *mMultilineConfig.first->GetEndPatternReg(), exception)) {
// Ensure the end line is complete
if (buffer[endPs] == '\n') {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
return endPs + 1;
}
}
} else if (mMultilineConfig.first->GetStartPatternReg()
&& BoostRegexMatch(buffer + lineBegin,
endPs - lineBegin,
*mMultilineConfig.first->GetStartPatternReg(),
exception)) {
&& BoostRegexMatch(
content.data(), content.size(), *mMultilineConfig.first->GetStartPatternReg(), exception)) {
// start + continue, start
++rollbackLineFeedCount;
// Keep all the buffer if rollback all
return lineBegin;
} else if (mMultilineConfig.first->GetContinuePatternReg()) {
// We can confirm the logs before are complete if continue is configured but no regex pattern can match.
if (buffer[endPs] == '\n') {
return endPs + 1;
} else {
// Keep all the buffer if rollback all
return lineBegin;
}
} else {
++rollbackLineFeedCount;
endPs = begPs;
return content.data() - buffer;
}
++rollbackLineFeedCount;
endPs = content.data() - buffer - 1;
}
begPs--;
}
return 0;
// Single line rollback or all unmatch rollback
rollbackLineFeedCount = 0;
if (buffer[size - 1] == '\n') {
return size;
}
StringView content = GetLastLine(StringView(buffer, size), size - 1);
++rollbackLineFeedCount;
return content.data() - buffer;
}

/*
params:
buffer: all read logs
end: the end position of current line, \n or \0
return:
last line (backward), without \n or \0
*/
StringView LogFileReader::GetLastLine(StringView buffer, size_t end) {
if (end == 0) {
return buffer;
}

for (size_t begin = end; begin > 0; --begin) {
if (buffer[begin - 1] == '\n') {
return StringView(buffer.data() + begin, end - begin);
}
}
return StringView(buffer.data(), end);
}

size_t LogFileReader::AlignLastCharacter(char* buffer, size_t size) {
Expand All @@ -2108,8 +2109,8 @@ size_t LogFileReader::AlignLastCharacter(char* buffer, size_t size) {
// 1. The number of byte for one character can be 1, 2, 4.
// 2. 1 byte character: the top bit is 0.
// 3. 2 bytes character: the 1st byte is between 0x81 and 0xFE; the 2nd byte is between 0x40 and 0xFE.
// 4. 4 bytes character: the 1st and 3rd byte is between 0x81 and 0xFE; the 2nd and 4th byte are between 0x30
// and 0x39. (not supported to align)
// 4. 4 bytes character: the 1st and 3rd byte is between 0x81 and 0xFE; the 2nd and 4th byte are between
// 0x30 and 0x39. (not supported to align)

// 1 byte character, 2nd byte of 2 bytes, 2nd or 4th byte of 4 bytes
if ((buffer[endPs] & 0x80) == 0 || size == 1) {
Expand Down
7 changes: 4 additions & 3 deletions core/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class LogFileReader {
FileCompareResult CompareToFile(const std::string& filePath);

virtual int32_t
LastMatchedLine(char* buffer, int32_t size, int32_t& rollbackLineFeedCount, bool allowRollback = true);
RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& rollbackLineFeedCount, bool allowRollback = true);

size_t AlignLastCharacter(char* buffer, size_t size);

Expand Down Expand Up @@ -502,6 +502,8 @@ class LogFileReader {
// @param fromCpt: if the read size is recoveried from checkpoint, set it to true.
size_t getNextReadSize(int64_t fileEnd, bool& fromCpt);

StringView GetLastLine(StringView buffer, size_t end);

// Update current checkpoint's read offset and length after success read.
void setExactlyOnceCheckpointAfterRead(size_t readSize);

Expand Down Expand Up @@ -587,8 +589,7 @@ class LogFileReader {
friend class LogSplitUnittest;
friend class LogSplitDiscardUnmatchUnittest;
friend class LogSplitNoDiscardUnmatchUnittest;
friend class LastMatchedLineDiscardUnmatchUnittest;
friend class LastMatchedLineNoDiscardUnmatchUnittest;
friend class RemoveLastIncompleteLogMultilineUnittest;
friend class LogFileReaderCheckpointUnittest;

protected:
Expand Down
2 changes: 1 addition & 1 deletion core/unittest/reader/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ target_link_libraries(file_reader_options_unittest unittest_base)
add_executable(json_log_file_reader_unittest JsonLogFileReaderUnittest.cpp)
target_link_libraries(json_log_file_reader_unittest unittest_base)

add_executable(last_matched_line_unittest LastMatchedLineUnittest.cpp)
add_executable(last_matched_line_unittest RemoveLastIncompleteLogUnittest.cpp)
target_link_libraries(last_matched_line_unittest unittest_base)

add_executable(log_file_reader_unittest LogFileReaderUnittest.cpp)
Expand Down
Loading
Loading