Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.0 hotfix: Resolve intermittent crash due to zero-byte reads in log reading #1501

Merged
merged 4 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions core/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1650,6 +1650,7 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,
logBuffer.readOffset = mLastFilePos;
--nbytes;
}
mLastForceRead = !allowRollback;
mCache.clear();
moreData = false;
} else {
Expand All @@ -1662,7 +1663,8 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,
if (READ_BYTE < lastCacheSize) {
READ_BYTE = lastCacheSize; // this should not happen, just avoid READ_BYTE >= 0 theoratically
}
StringBuffer stringMemory = logBuffer.sourcebuffer->AllocateStringBuffer(READ_BYTE); // allocate modifiable buffer
StringBuffer stringMemory
= logBuffer.sourcebuffer->AllocateStringBuffer(READ_BYTE); // allocate modifiable buffer
if (lastCacheSize) {
READ_BYTE -= lastCacheSize; // reserve space to copy from cache if needed
}
Expand All @@ -1687,6 +1689,7 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,
logBuffer.readOffset = mLastFilePos;
--nbytes;
}
mLastForceRead = !allowRollback;
const size_t stringBufferLen = nbytes;
logBuffer.truncateInfo.reset(truncateInfo);
lastReadPos = mLastFilePos + nbytes; // this doesn't seem right when ulogfs is used and a hole is skipped
Expand Down Expand Up @@ -1737,9 +1740,10 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,

// cache is sealed, nbytes should no change any more
size_t stringLen = nbytes;
if (stringBuffer[stringLen - 1] == '\n'
|| stringBuffer[stringLen - 1]
== '\0') { // \0 is for json, such behavior make ilogtail not able to collect binary log
if (stringLen > 0
&& (stringBuffer[stringLen - 1] == '\n'
|| stringBuffer[stringLen - 1]
== '\0')) { // \0 is for json, such behavior make ilogtail not able to collect binary log
--stringLen;
}
stringBuffer[stringLen] = '\0';
Expand All @@ -1749,7 +1753,6 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,
setExactlyOnceCheckpointAfterRead(nbytes);
mLastFilePos += nbytes;

mLastForceRead = !allowRollback;
LOG_DEBUG(sLogger, ("read size", nbytes)("last file pos", mLastFilePos));
}

Expand All @@ -1774,6 +1777,7 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b
logBuffer.readOffset = mLastFilePos;
--readCharCount;
}
mLastForceRead = !allowRollback;
lastReadPos = mLastFilePos + readCharCount;
originReadCount = readCharCount;
moreData = false;
Expand Down Expand Up @@ -1806,6 +1810,7 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b
++mLastFilePos;
logBuffer.readOffset = mLastFilePos;
}
mLastForceRead = !allowRollback;
logBuffer.truncateInfo.reset(truncateInfo);
lastReadPos = mLastFilePos + readCharCount;
originReadCount = readCharCount;
Expand Down Expand Up @@ -1887,9 +1892,10 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b
}
// cache is sealed, readCharCount should not change any more
size_t stringLen = resultCharCount;
if (stringBuffer[stringLen - 1] == '\n'
|| stringBuffer[stringLen - 1]
== '\0') { // \0 is for json, such behavior make ilogtail not able to collect binary log
if (stringLen > 0
&& (stringBuffer[stringLen - 1] == '\n'
|| stringBuffer[stringLen - 1]
== '\0')) { // \0 is for json, such behavior make ilogtail not able to collect binary log
--stringLen;
}
stringBuffer[stringLen] = '\0';
Expand All @@ -1911,7 +1917,6 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b
LogtailAlarm::GetInstance()->SendAlarm(
SPLIT_LOG_FAIL_ALARM, oss.str(), GetProject(), GetLogstore(), GetRegion());
}
mLastForceRead = !allowRollback;
LOG_DEBUG(sLogger,
("read gbk buffer, offset", mLastFilePos)("origin read", originReadCount)("at last read", readCharCount));
}
Expand Down Expand Up @@ -2032,7 +2037,7 @@ LogFileReader::FileCompareResult LogFileReader::CompareToFile(const string& file
*/
int32_t
LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& rollbackLineFeedCount, bool allowRollback) {
if (!allowRollback) {
if (!allowRollback || size == 0) {
return size;
}
int32_t endPs; // the position of \n or \0
Expand Down Expand Up @@ -2087,7 +2092,7 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
*/
StringView LogFileReader::GetLastLine(StringView buffer, size_t end) {
if (end == 0) {
return buffer;
return StringView(buffer.data(), 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

外层函数要处理吗?boost遇到size=0是什么行为?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

补了一个单测,boost的结果是一个空字符串。GetLastLine的返回结果只会用到开始的位置,只要是以buffer.data()开头的,应该没有问题。

}

for (size_t begin = end; begin > 0; --begin) {
Expand Down
6 changes: 3 additions & 3 deletions core/unittest/reader/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ target_link_libraries(file_reader_options_unittest unittest_base)
add_executable(json_log_file_reader_unittest JsonLogFileReaderUnittest.cpp)
target_link_libraries(json_log_file_reader_unittest unittest_base)

add_executable(last_matched_line_unittest RemoveLastIncompleteLogUnittest.cpp)
target_link_libraries(last_matched_line_unittest unittest_base)
add_executable(remove_last_incomplete_log_unittest RemoveLastIncompleteLogUnittest.cpp)
target_link_libraries(remove_last_incomplete_log_unittest unittest_base)

add_executable(log_file_reader_unittest LogFileReaderUnittest.cpp)
target_link_libraries(log_file_reader_unittest unittest_base)
Expand All @@ -45,6 +45,6 @@ include(GoogleTest)
gtest_discover_tests(log_file_reader_deleted_file_unittest)
gtest_discover_tests(file_reader_options_unittest)
gtest_discover_tests(json_log_file_reader_unittest)
gtest_discover_tests(last_matched_line_unittest)
gtest_discover_tests(remove_last_incomplete_log_unittest)
gtest_discover_tests(log_file_reader_unittest)
gtest_discover_tests(source_buffer_unittest)
110 changes: 105 additions & 5 deletions core/unittest/reader/LogFileReaderUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "unittest/Unittest.h"
#include <stdio.h>

#include <fstream>

#include "checkpoint/CheckPointManager.h"
#include "reader/LogFileReader.h"
#include "common/memory/SourceBuffer.h"
#include "common/RuntimeUtil.h"
#include "common/FileSystemUtil.h"
#include "log_pb/sls_logs.pb.h"
#include "common/RuntimeUtil.h"
#include "common/memory/SourceBuffer.h"
#include "file_server/FileServer.h"
#include "log_pb/sls_logs.pb.h"
#include "reader/LogFileReader.h"
#include "unittest/Unittest.h"

DECLARE_FLAG_INT32(force_release_deleted_file_fd_timeout);

Expand Down Expand Up @@ -237,6 +239,55 @@ void LogFileReaderUnittest::TestReadGBK() {
APSARA_TEST_FALSE_FATAL(moreData);
APSARA_TEST_STREQ_FATAL(NULL, logBuffer.rawBuffer.data());
}
{ // force read + \n, which case read bytes is 0
Json::Value config;
config["StartPattern"] = "iLogtail.*";
MultilineOptions multilineOpts;
multilineOpts.Init(config, ctx, "");
FileReaderOptions readerOpts;
readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK;
LogFileReader reader(
logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
reader.UpdateReaderManual();
reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING);
int64_t fileSize = reader.mLogFileOp.GetFileSize();
reader.CheckFileSignatureAndOffset(true);
LogBuffer logBuffer;
bool moreData = false;
std::string expectedPart(expectedContent.get());
// first read, read first line without \n and not allowRollback
int64_t firstReadSize = expectedPart.find("\n");
expectedPart.resize(firstReadSize);
reader.ReadGBK(logBuffer, 127, moreData, false); // first line without \n
APSARA_TEST_FALSE_FATAL(moreData);
APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data());
APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL);
APSARA_TEST_TRUE_FATAL(reader.mLastForceRead);

// second read, start with \n but with other lines
reader.ReadGBK(logBuffer, fileSize - 1, moreData);
APSARA_TEST_FALSE_FATAL(moreData);
APSARA_TEST_GE_FATAL(reader.mCache.size(), 0UL);
std::string expectedPart2(expectedContent.get() + firstReadSize + 1); // skip \n
int64_t secondReadSize = expectedPart2.rfind("iLogtail") - 1;
expectedPart2.resize(secondReadSize);
APSARA_TEST_STREQ_FATAL(expectedPart2.c_str(), logBuffer.rawBuffer.data());
APSARA_TEST_FALSE_FATAL(reader.mLastForceRead);

// third read, force read cache
reader.ReadGBK(logBuffer, fileSize - 1, moreData, false);
std::string expectedPart3(expectedContent.get() + firstReadSize + 1 + secondReadSize + 1);
APSARA_TEST_STREQ_FATAL(expectedPart3.c_str(), logBuffer.rawBuffer.data());
APSARA_TEST_TRUE_FATAL(reader.mLastForceRead);

// fourth read, only read \n
LogBuffer logBuffer2;
reader.ReadGBK(logBuffer2, fileSize, moreData);
APSARA_TEST_FALSE_FATAL(moreData);
APSARA_TEST_GE_FATAL(reader.mCache.size(), 0UL);
APSARA_TEST_EQUAL_FATAL(fileSize, reader.mLastFilePos);
APSARA_TEST_STREQ_FATAL(NULL, logBuffer2.rawBuffer.data());
}
}

void LogFileReaderUnittest::TestReadUTF8() {
Expand Down Expand Up @@ -383,6 +434,55 @@ void LogFileReaderUnittest::TestReadUTF8() {
APSARA_TEST_FALSE_FATAL(moreData);
APSARA_TEST_STREQ_FATAL(NULL, logBuffer.rawBuffer.data());
}
{ // force read + \n, which case read bytes is 0
Json::Value config;
config["StartPattern"] = "iLogtail.*";
MultilineOptions multilineOpts;
multilineOpts.Init(config, ctx, "");
FileReaderOptions readerOpts;
LogFileReader reader(
logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
reader.UpdateReaderManual();
reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING);
int64_t fileSize = reader.mLogFileOp.GetFileSize();
reader.CheckFileSignatureAndOffset(true);
LogBuffer logBuffer;
bool moreData = false;
std::string expectedPart(expectedContent.get());
// first read, read first line without \n and not allowRollback
int64_t firstReadSize = expectedPart.find("\n");
expectedPart.resize(firstReadSize);
reader.mLastForceRead = true;
reader.ReadUTF8(logBuffer, firstReadSize, moreData, false);
APSARA_TEST_FALSE_FATAL(moreData);
APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data());
APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL);
APSARA_TEST_TRUE_FATAL(reader.mLastForceRead);

// second read, start with \n but with other lines
reader.ReadUTF8(logBuffer, fileSize - 1, moreData);
APSARA_TEST_FALSE_FATAL(moreData);
APSARA_TEST_GE_FATAL(reader.mCache.size(), 0UL);
std::string expectedPart2(expectedContent.get() + firstReadSize + 1); // skip \n
int64_t secondReadSize = expectedPart2.rfind("iLogtail") - 1;
expectedPart2.resize(secondReadSize);
APSARA_TEST_STREQ_FATAL(expectedPart2.c_str(), logBuffer.rawBuffer.data());
APSARA_TEST_FALSE_FATAL(reader.mLastForceRead);

// third read, force read cache
reader.ReadUTF8(logBuffer, fileSize - 1, moreData, false);
std::string expectedPart3(expectedContent.get() + firstReadSize + 1 + secondReadSize + 1);
APSARA_TEST_STREQ_FATAL(expectedPart3.c_str(), logBuffer.rawBuffer.data());
APSARA_TEST_TRUE_FATAL(reader.mLastForceRead);

// fourth read, only read \n
LogBuffer logBuffer2;
reader.ReadUTF8(logBuffer2, fileSize, moreData);
APSARA_TEST_FALSE_FATAL(moreData);
APSARA_TEST_GE_FATAL(reader.mCache.size(), 0UL);
APSARA_TEST_EQUAL_FATAL(fileSize, reader.mLastFilePos);
APSARA_TEST_STREQ_FATAL(NULL, logBuffer2.rawBuffer.data());
}
}

class LogMultiBytesUnittest : public ::testing::Test {
Expand Down
25 changes: 23 additions & 2 deletions core/unittest/reader/RemoveLastIncompleteLogUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// limitations under the License.

#include "common/FileSystemUtil.h"
#include "reader/LogFileReader.h"
#include "common/memory/SourceBuffer.h"
#include "reader/LogFileReader.h"
#include "unittest/Unittest.h"

namespace logtail {
Expand Down Expand Up @@ -122,6 +122,15 @@ void RemoveLastIncompleteLogUnittest::TestSingleline() {
// return the whole buffer, so no rollback
APSARA_TEST_EQUAL_FATAL(1, rollbackLineFeedCount);
}
{ // case empty string
std::string expectMatch = "";
int32_t rollbackLineFeedCount = 0;
std::string testLog2 = expectMatch + "";
int32_t matchSize = logFileReader.RemoveLastIncompleteLog(
const_cast<char*>(testLog2.data()), testLog2.size(), rollbackLineFeedCount);
APSARA_TEST_EQUAL_FATAL(int32_t(expectMatch.size()), matchSize);
APSARA_TEST_EQUAL_FATAL(0, rollbackLineFeedCount);
}
}

void RemoveLastIncompleteLogUnittest::TestMultiline() {
Expand All @@ -131,13 +140,13 @@ void RemoveLastIncompleteLogUnittest::TestMultiline() {
multilineOpts.Init(config, ctx, "");
LogFileReader logFileReader(
logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx));
int32_t rollbackLineFeedCount = 0;
{ // case multi line
std::vector<int32_t> index;
std::string firstLog = LOG_BEGIN_STRING + "first.\nmultiline1\nmultiline2";
std::string secondLog = LOG_BEGIN_STRING + "second.\nmultiline1\nmultiline2";
std::string expectMatch = firstLog + '\n';
std::string testLog = expectMatch + secondLog + '\n';
int32_t rollbackLineFeedCount = 0;
int32_t matchSize = logFileReader.RemoveLastIncompleteLog(
const_cast<char*>(testLog.data()), testLog.size(), rollbackLineFeedCount);
APSARA_TEST_EQUAL_FATAL(static_cast<int32_t>(expectMatch.size()), matchSize);
Expand All @@ -150,6 +159,7 @@ void RemoveLastIncompleteLogUnittest::TestMultiline() {
std::string secondLog = LOG_BEGIN_STRING + "second.\nmultiline1\nmultiline2";
std::string expectMatch = firstLog + '\n';
std::string testLog = expectMatch + secondLog;
int32_t rollbackLineFeedCount = 0;
int32_t matchSize = logFileReader.RemoveLastIncompleteLog(
const_cast<char*>(testLog.data()), testLog.size(), rollbackLineFeedCount);
APSARA_TEST_EQUAL_FATAL(static_cast<int32_t>(expectMatch.size()), matchSize);
Expand All @@ -158,6 +168,7 @@ void RemoveLastIncompleteLogUnittest::TestMultiline() {
}
{ // case multi line not match
std::string testLog2 = "log begin does not match.\nlog begin does not match.\nlog begin does not match.\n";
int32_t rollbackLineFeedCount = 0;
int32_t matchSize = logFileReader.RemoveLastIncompleteLog(
const_cast<char*>(testLog2.data()), testLog2.size(), rollbackLineFeedCount);
APSARA_TEST_EQUAL_FATAL(testLog2.size(), matchSize);
Expand All @@ -166,11 +177,21 @@ void RemoveLastIncompleteLogUnittest::TestMultiline() {
{ // case multi line not match, buffer size not big enough (no new line at the end of line)
std::string expectMatch = "log begin does not match.\nlog begin does not match.\n";
std::string testLog2 = expectMatch + "log begin does not";
int32_t rollbackLineFeedCount = 0;
int32_t matchSize = logFileReader.RemoveLastIncompleteLog(
const_cast<char*>(testLog2.data()), testLog2.size(), rollbackLineFeedCount);
APSARA_TEST_EQUAL_FATAL(expectMatch.size(), matchSize);
APSARA_TEST_EQUAL_FATAL(1, rollbackLineFeedCount);
}
{ // case empty string
std::string expectMatch = "";
int32_t rollbackLineFeedCount = 0;
std::string testLog2 = expectMatch + "";
int32_t matchSize = logFileReader.RemoveLastIncompleteLog(
const_cast<char*>(testLog2.data()), testLog2.size(), rollbackLineFeedCount);
APSARA_TEST_EQUAL_FATAL(int32_t(expectMatch.size()), matchSize);
APSARA_TEST_EQUAL_FATAL(0, rollbackLineFeedCount);
}
}

class RemoveLastIncompleteLogMultilineUnittest : public ::testing::Test {
Expand Down
Loading