Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactor multiline split state #1410

Merged
merged 16 commits into from
Mar 29, 2024
14 changes: 14 additions & 0 deletions core/file_server/MultilineOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,18 @@ bool MultilineOptions::ParseRegex(const string& pattern, shared_ptr<boost::regex
return true;
}

const std::string& MultilineOptions::UnmatchedContentTreatmentToString() {
switch (mUnmatchedContentTreatment) {
case UnmatchedContentTreatment::DISCARD:
static std::string discardStr = "discard";
return discardStr;
case UnmatchedContentTreatment::SINGLE_LINE:
static std::string singleLine = "single line";
return singleLine;
default:
static std::string unkonwn = "";
return unkonwn;
}
}

} // namespace logtail
1 change: 1 addition & 0 deletions core/file_server/MultilineOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace logtail {

class MultilineOptions {
public:
const std::string& UnmatchedContentTreatmentToString();
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
enum class Mode { CUSTOM, JSON };
enum class UnmatchedContentTreatment { DISCARD, SINGLE_LINE };

Expand Down
5 changes: 5 additions & 0 deletions core/monitor/MetricConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ const std::string METRIC_PROC_PARSE_ERROR_TOTAL = "proc_parse_error_total";
const std::string METRIC_PROC_KEY_COUNT_NOT_MATCH_ERROR_TOTAL = "proc_key_count_not_match_error_total";
const std::string METRIC_PROC_HISTORY_FAILURE_TOTAL = "proc_history_failure_total";

const std::string METRIC_PROC_SPLIT_MULTILINE_LOG_SPLITTED_RECORDS_TOTAL
= "proc_split_multiline_log_splitted_records_total";
const std::string METRIC_PROC_SPLIT_MULTILINE_LOG_UNMATCHED_RECORDS_TOTAL
= "proc_split_multiline_log_unmatched_records_total";

// processor filter metrics
const std::string METRIC_PROC_FILTER_IN_SIZE_BYTES = "proc_filter_in_size_bytes";
const std::string METRIC_PROC_FILTER_OUT_SIZE_BYTES = "proc_filter_out_size_bytes";
Expand Down
2 changes: 2 additions & 0 deletions core/monitor/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ extern const std::string METRIC_PROC_PARSE_OUT_SIZE_BYTES;
extern const std::string METRIC_PROC_PARSE_ERROR_TOTAL;
extern const std::string METRIC_PROC_KEY_COUNT_NOT_MATCH_ERROR_TOTAL;
extern const std::string METRIC_PROC_HISTORY_FAILURE_TOTAL;
extern const std::string METRIC_PROC_SPLIT_MULTILINE_LOG_SPLITTED_RECORDS_TOTAL;
extern const std::string METRIC_PROC_SPLIT_MULTILINE_LOG_UNMATCHED_RECORDS_TOTAL;

// processor filter metrics
extern const std::string METRIC_PROC_FILTER_IN_SIZE_BYTES;
Expand Down
538 changes: 257 additions & 281 deletions core/processor/ProcessorSplitRegexNative.cpp

Large diffs are not rendered by default.

33 changes: 21 additions & 12 deletions core/processor/ProcessorSplitRegexNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class ProcessorSplitRegexNative : public Processor {
static const std::string sName;

std::string mSourceKey = DEFAULT_CONTENT_KEY;
char mSplitChar = '\n';
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
MultilineOptions mMultiline;
bool mAppendingLogPositionMeta = false;
bool mIgnoreUnmatchWarning = false;

const std::string& Name() const override { return sName; }
bool Init(const Json::Value& config) override;
Expand All @@ -46,21 +48,28 @@ class ProcessorSplitRegexNative : public Processor {
const StringView& logPath,
PipelineEventPtr&& e,
EventsContainer& newEvents);
bool LogSplit(const char* buffer,
int32_t size,
int32_t& lineFeed,
std::vector<StringView>& logIndex,
std::vector<StringView>& discardIndex,
const StringView& logPath);
void HandleUnmatchLogs(const char* buffer,
int& multiBeginIndex,
int endIndex,
std::vector<StringView>& logIndex,
std::vector<StringView>& discardIndex);
void SplitLogByRegex(PipelineEventGroup& logGroup);
void HandleSplittedLogs(const StringView& content,
long sourceoffset,
StringBuffer& sourceKey,
const LogEvent& sourceEvent,
PipelineEventGroup& logGroup,
EventsContainer& newEvents);
void HandleUnmatchLogs(const StringView& sourceVal,
long sourceoffset,
StringBuffer& sourceKey,
const LogEvent& sourceEvent,
PipelineEventGroup& logGroup,
EventsContainer& newEvents,
StringView logPath);

StringView GetNextLine(StringView log, size_t begin);

int* mFeedLines = nullptr;
int* mSplitLines = nullptr;

CounterPtr mProcSplittedEventsCnt;
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
CounterPtr mProcUnmatchedEventsCnt;

#ifdef APSARA_UNIT_TEST_MAIN
friend class ProcessorSplitRegexNativeUnittest;
friend class ProcessorSplitRegexDisacardUnmatchUnittest;
Expand Down
2 changes: 0 additions & 2 deletions core/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ class DevInode;
typedef std::shared_ptr<LogFileReader> LogFileReaderPtr;
typedef std::deque<LogFileReaderPtr> LogFileReaderPtrArray;

enum SplitState { SPLIT_UNMATCH, SPLIT_BEGIN, SPLIT_CONTINUE };

// Only get the currently written log file, it will choose the last modified file to read. There are several condition
// to choose the lastmodify file:
// 1. if the last read file don't exist
Expand Down
83 changes: 17 additions & 66 deletions core/unittest/processor/ProcessorSplitRegexNativeUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class ProcessorSplitRegexNativeUnittest : public ::testing::Test {
void SetUp() override { mContext.SetConfigName("project##config_0"); }

void TestInit();
void TestProcessEventSingleLine();
void TestProcessEventMultiline();
void TestProcessEventMultilineKeepUnmatch();
void TestProcessEventMultilineDiscardUnmatch();
Expand All @@ -48,7 +47,6 @@ class ProcessorSplitRegexNativeUnittest : public ::testing::Test {
};

UNIT_TEST_CASE(ProcessorSplitRegexNativeUnittest, TestInit);
UNIT_TEST_CASE(ProcessorSplitRegexNativeUnittest, TestProcessEventSingleLine);
UNIT_TEST_CASE(ProcessorSplitRegexNativeUnittest, TestProcessEventMultiline);
UNIT_TEST_CASE(ProcessorSplitRegexNativeUnittest, TestProcessEventMultilineKeepUnmatch);
UNIT_TEST_CASE(ProcessorSplitRegexNativeUnittest, TestProcessEventMultilineDiscardUnmatch);
Expand All @@ -64,73 +62,11 @@ void ProcessorSplitRegexNativeUnittest::TestInit() {
config["AppendingLogPositionMeta"] = false;
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
}

void ProcessorSplitRegexNativeUnittest::TestProcessEventSingleLine() {
// make config
Json::Value config;
config["StartPattern"] = ".*";
config["UnmatchedContentTreatment"] = "split";
config["AppendingLogPositionMeta"] = false;
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
// make eventGroup
auto sourceBuffer = std::make_shared<SourceBuffer>();
PipelineEventGroup eventGroup(sourceBuffer);
std::string inJson = R"({
"events" :
[
{
"contents" :
{
"__file_offset__": "0",
"content" : "line1\nline2"
},
"timestamp" : 12345678901,
"timestampNanosecond" : 0,
"type" : 1
}
]
})";
eventGroup.FromJsonString(inJson);
std::string logPath("/var/log/message");
EventsContainer newEvents;
// run test function
processor.ProcessEvent(eventGroup, logPath, std::move(eventGroup.MutableEvents()[0]), newEvents);
eventGroup.SwapEvents(newEvents);
// judge result
std::string expectJson = R"({
"events" :
[
{
"contents" :
{
"content" : "line1"
},
"timestamp" : 12345678901,
"timestampNanosecond" : 0,
"type" : 1
},
{
"contents" :
{
"content" : "line2"
},
"timestamp" : 12345678901,
"timestampNanosecond" : 0,
"type" : 1
}
]
})";
std::string outJson = eventGroup.ToJsonString();
APSARA_TEST_STREQ_FATAL(CompactJson(expectJson).c_str(), CompactJson(outJson).c_str());
}

void ProcessorSplitRegexNativeUnittest::TestProcessEventMultiline() {
// make config
Json::Value config;
Expand All @@ -140,6 +76,7 @@ void ProcessorSplitRegexNativeUnittest::TestProcessEventMultiline() {
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
// make eventGroup
Expand Down Expand Up @@ -211,6 +148,7 @@ void ProcessorSplitRegexNativeUnittest::TestProcessEventMultilineKeepUnmatch() {
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
// make eventGroup
Expand Down Expand Up @@ -297,6 +235,7 @@ void ProcessorSplitRegexNativeUnittest::TestProcessEventMultilineDiscardUnmatch(
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
// make eventGroup
Expand Down Expand Up @@ -354,6 +293,7 @@ void ProcessorSplitRegexNativeUnittest::TestProcessEventMultilineAllNotMatchKeep
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
// make eventGroup
Expand Down Expand Up @@ -436,6 +376,7 @@ void ProcessorSplitRegexNativeUnittest::TestProcessEventMultilineAllNotMatchDisc
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
// make eventGroup
Expand Down Expand Up @@ -476,6 +417,7 @@ void ProcessorSplitRegexNativeUnittest::TestProcess() {
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
// make eventGroup
Expand Down Expand Up @@ -530,7 +472,6 @@ void ProcessorSplitRegexNativeUnittest::TestProcess() {
std::string outJson = eventGroup.ToJsonString();
APSARA_TEST_STREQ_FATAL(CompactJson(expectJson.str()).c_str(), CompactJson(outJson).c_str());
// check observability
APSARA_TEST_EQUAL_FATAL(4, processor.GetContext().GetProcessProfile().feedLines);
APSARA_TEST_EQUAL_FATAL(2, processor.GetContext().GetProcessProfile().splitLines);
}

Expand Down Expand Up @@ -563,6 +504,7 @@ void ProcessorSplitRegexDisacardUnmatchUnittest::TestLogSplitWithBeginContinue()
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
{ // case: complete log
Expand Down Expand Up @@ -710,6 +652,7 @@ void ProcessorSplitRegexDisacardUnmatchUnittest::TestLogSplitWithBeginEnd() {
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
{ // case: complete log
Expand Down Expand Up @@ -876,6 +819,7 @@ void ProcessorSplitRegexDisacardUnmatchUnittest::TestLogSplitWithBegin() {
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
{ // case: complete log
Expand Down Expand Up @@ -966,6 +910,7 @@ void ProcessorSplitRegexDisacardUnmatchUnittest::TestLogSplitWithContinueEnd() {
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
{ // case: complete log
Expand Down Expand Up @@ -1102,6 +1047,7 @@ void ProcessorSplitRegexDisacardUnmatchUnittest::TestLogSplitWithEnd() {
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
{ // case: complete log
Expand Down Expand Up @@ -1211,6 +1157,7 @@ void ProcessorSplitRegexKeepUnmatchUnittest::TestLogSplitWithBeginContinue() {
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
{ // case: complete log
Expand Down Expand Up @@ -1404,6 +1351,7 @@ void ProcessorSplitRegexKeepUnmatchUnittest::TestLogSplitWithBeginEnd() {
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
{ // case: complete log
Expand Down Expand Up @@ -1652,6 +1600,7 @@ void ProcessorSplitRegexKeepUnmatchUnittest::TestLogSplitWithBegin() {
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
{ // case: complete log
Expand Down Expand Up @@ -1768,6 +1717,7 @@ void ProcessorSplitRegexKeepUnmatchUnittest::TestLogSplitWithContinueEnd() {
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
{ // case: complete log
Expand Down Expand Up @@ -1940,6 +1890,7 @@ void ProcessorSplitRegexKeepUnmatchUnittest::TestLogSplitWithEnd() {
// make processor
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
std::string pluginId = "testID";
APSARA_TEST_TRUE_FATAL(processor.Init(config));
{ // case: complete log
Expand Down
Loading