Skip to content

fix: refactor multiline split state #1410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Mar 29, 2024
14 changes: 14 additions & 0 deletions core/file_server/MultilineOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,18 @@ bool MultilineOptions::ParseRegex(const string& pattern, shared_ptr<boost::regex
return true;
}

const std::string& MultilineOptions::UnmatchedContentTreatmentToString() {
switch (mUnmatchedContentTreatment) {
case UnmatchedContentTreatment::DISCARD:
static std::string discardStr = "discard";
return discardStr;
case UnmatchedContentTreatment::SINGLE_LINE:
static std::string singleLine = "single line";
return singleLine;
default:
static std::string unkonwn = "";
return unkonwn;
}
}

} // namespace logtail
1 change: 1 addition & 0 deletions core/file_server/MultilineOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace logtail {

class MultilineOptions {
public:
const std::string& UnmatchedContentTreatmentToString();
enum class Mode { CUSTOM, JSON };
enum class UnmatchedContentTreatment { DISCARD, SINGLE_LINE };

Expand Down
5 changes: 5 additions & 0 deletions core/monitor/MetricConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ const std::string METRIC_PROC_PARSE_ERROR_TOTAL = "proc_parse_error_total";
const std::string METRIC_PROC_KEY_COUNT_NOT_MATCH_ERROR_TOTAL = "proc_key_count_not_match_error_total";
const std::string METRIC_PROC_HISTORY_FAILURE_TOTAL = "proc_history_failure_total";

const std::string METRIC_PROC_SPLIT_MULTILINE_LOG_SPLITTED_RECORDS_TOTAL
= "proc_split_multiline_log_splitted_records_total";
const std::string METRIC_PROC_SPLIT_MULTILINE_LOG_UNMATCHED_RECORDS_TOTAL
= "proc_split_multiline_log_unmatched_records_total";

// processor filter metrics
const std::string METRIC_PROC_FILTER_IN_SIZE_BYTES = "proc_filter_in_size_bytes";
const std::string METRIC_PROC_FILTER_OUT_SIZE_BYTES = "proc_filter_out_size_bytes";
Expand Down
2 changes: 2 additions & 0 deletions core/monitor/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ extern const std::string METRIC_PROC_PARSE_OUT_SIZE_BYTES;
extern const std::string METRIC_PROC_PARSE_ERROR_TOTAL;
extern const std::string METRIC_PROC_KEY_COUNT_NOT_MATCH_ERROR_TOTAL;
extern const std::string METRIC_PROC_HISTORY_FAILURE_TOTAL;
extern const std::string METRIC_PROC_SPLIT_MULTILINE_LOG_SPLITTED_RECORDS_TOTAL;
extern const std::string METRIC_PROC_SPLIT_MULTILINE_LOG_UNMATCHED_RECORDS_TOTAL;

// processor filter metrics
extern const std::string METRIC_PROC_FILTER_IN_SIZE_BYTES;
Expand Down
538 changes: 257 additions & 281 deletions core/processor/ProcessorSplitRegexNative.cpp

Large diffs are not rendered by default.

33 changes: 21 additions & 12 deletions core/processor/ProcessorSplitRegexNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class ProcessorSplitRegexNative : public Processor {
static const std::string sName;

std::string mSourceKey = DEFAULT_CONTENT_KEY;
char mSplitChar = '\n';
MultilineOptions mMultiline;
bool mAppendingLogPositionMeta = false;
bool mIgnoreUnmatchWarning = false;

const std::string& Name() const override { return sName; }
bool Init(const Json::Value& config) override;
Expand All @@ -46,21 +48,28 @@ class ProcessorSplitRegexNative : public Processor {
const StringView& logPath,
PipelineEventPtr&& e,
EventsContainer& newEvents);
bool LogSplit(const char* buffer,
int32_t size,
int32_t& lineFeed,
std::vector<StringView>& logIndex,
std::vector<StringView>& discardIndex,
const StringView& logPath);
void HandleUnmatchLogs(const char* buffer,
int& multiBeginIndex,
int endIndex,
std::vector<StringView>& logIndex,
std::vector<StringView>& discardIndex);
void SplitLogByRegex(PipelineEventGroup& logGroup);
void HandleSplittedLogs(const StringView& content,
long sourceoffset,
StringBuffer& sourceKey,
const LogEvent& sourceEvent,
PipelineEventGroup& logGroup,
EventsContainer& newEvents);
void HandleUnmatchLogs(const StringView& sourceVal,
long sourceoffset,
StringBuffer& sourceKey,
const LogEvent& sourceEvent,
PipelineEventGroup& logGroup,
EventsContainer& newEvents,
StringView logPath);

StringView GetNextLine(StringView log, size_t begin);

int* mFeedLines = nullptr;
int* mSplitLines = nullptr;

CounterPtr mProcSplittedEventsCnt;
CounterPtr mProcUnmatchedEventsCnt;

#ifdef APSARA_UNIT_TEST_MAIN
friend class ProcessorSplitRegexNativeUnittest;
friend class ProcessorSplitRegexDisacardUnmatchUnittest;
Expand Down
2 changes: 0 additions & 2 deletions core/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ class DevInode;
typedef std::shared_ptr<LogFileReader> LogFileReaderPtr;
typedef std::deque<LogFileReaderPtr> LogFileReaderPtrArray;

enum SplitState { SPLIT_UNMATCH, SPLIT_BEGIN, SPLIT_CONTINUE };

// Only get the currently written log file, it will choose the last modified file to read. There are several condition
// to choose the lastmodify file:
// 1. if the last read file don't exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ dbf@@@324 FS2$%pwd,pwd=saf543#$@,,"
// run function ProcessorSplitRegexNative
ProcessorSplitRegexNative processorSplitRegexNative;
processorSplitRegexNative.SetContext(mContext);
processorSplitRegexNative.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
APSARA_TEST_TRUE_FATAL(processorSplitRegexNative.Init(config));
processorSplitRegexNative.Process(eventGroup);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ void ProcessorParseApsaraNativeUnittest::TestMultipleLines() {
// run function ProcessorSplitRegexNative
ProcessorSplitRegexNative processorSplitRegexNative;
processorSplitRegexNative.SetContext(mContext);
processorSplitRegexNative.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
APSARA_TEST_TRUE_FATAL(processorSplitRegexNative.Init(config));
processorSplitRegexNative.Process(eventGroup);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ void ProcessorParseDelimiterNativeUnittest::TestAllowingShortenedFields() {
// run function ProcessorSplitRegexNative
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
APSARA_TEST_TRUE_FATAL(processor.Init(config));
processor.Process(eventGroup);

Expand Down Expand Up @@ -363,6 +364,7 @@ void ProcessorParseDelimiterNativeUnittest::TestAllowingShortenedFields() {
// run function ProcessorSplitRegexNative
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
APSARA_TEST_TRUE_FATAL(processor.Init(config));
processor.Process(eventGroup);
// run function ProcessorParseDelimiterNative
Expand Down Expand Up @@ -514,6 +516,7 @@ void ProcessorParseDelimiterNativeUnittest::TestExtend() {
// run function ProcessorSplitRegexNative
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
APSARA_TEST_TRUE_FATAL(processor.Init(config));
processor.Process(eventGroup);
// run function ProcessorParseDelimiterNative
Expand Down Expand Up @@ -640,6 +643,7 @@ void ProcessorParseDelimiterNativeUnittest::TestExtend() {
// run function ProcessorSplitRegexNative
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
APSARA_TEST_TRUE_FATAL(processor.Init(config));
processor.Process(eventGroup);
// run function ProcessorParseDelimiterNative
Expand Down Expand Up @@ -764,6 +768,7 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLines() {
// run function ProcessorSplitRegexNative
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
APSARA_TEST_TRUE_FATAL(processor.Init(config));
processor.Process(eventGroup);
// run function ProcessorParseDelimiterNative
Expand Down Expand Up @@ -885,6 +890,7 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLines() {
// run function ProcessorSplitRegexNative
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
APSARA_TEST_TRUE_FATAL(processor.Init(config));
processor.Process(eventGroup);
// run function ProcessorParseDelimiterNative
Expand Down Expand Up @@ -1011,6 +1017,7 @@ void ProcessorParseDelimiterNativeUnittest::TestMultipleLines() {
// run function ProcessorSplitRegexNative
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
APSARA_TEST_TRUE_FATAL(processor.Init(config));
processor.Process(eventGroup);
// run function ProcessorParseDelimiterNative
Expand Down Expand Up @@ -1228,6 +1235,7 @@ void ProcessorParseDelimiterNativeUnittest::TestProcessQuote() {
// run function ProcessorSplitRegexNative
ProcessorSplitRegexNative processor;
processor.SetContext(mContext);
processor.SetMetricsRecordRef(ProcessorSplitRegexNative::sName, "1");
APSARA_TEST_TRUE_FATAL(processor.Init(config));
processor.Process(eventGroup);

Expand Down
Loading
Loading