Skip to content

Fix apsara base field parse error and microsecond handling in Apsara log parser #1308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 40 additions & 38 deletions core/processor/ProcessorParseApsaraNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ void ProcessorParseApsaraNative::Process(PipelineEventGroup& logGroup) {
const StringView& logPath = logGroup.GetMetadata(EventGroupMetaKey::LOG_FILE_PATH_RESOLVED);
EventsContainer& events = logGroup.MutableEvents();
StringView timeStrCache;
LogtailTime lastLogTime;
LogtailTime cachedLogTime;
// works good normally. poor performance if most data need to be discarded.
for (auto it = events.begin(); it != events.end();) {
if (ProcessEvent(logPath, *it, lastLogTime, timeStrCache)) {
if (ProcessEvent(logPath, *it, cachedLogTime, timeStrCache)) {
++it;
} else {
it = events.erase(it);
Expand All @@ -110,13 +110,13 @@ void ProcessorParseApsaraNative::Process(PipelineEventGroup& logGroup) {
* 处理单个日志事件。
* @param logPath - 日志文件的路径。
* @param e - 指向待处理日志事件的智能指针。
* @param lastLogTime - 上一条日志的时间戳(秒)。
* @param cachedLogTime - 上一条日志的时间戳(秒)。
* @param timeStrCache - 缓存时间字符串,用于比较和更新。
* @return 如果事件被处理且保留,则返回true,如果事件被丢弃,则返回false。
*/
bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
PipelineEventPtr& e,
LogtailTime& lastLogTime,
LogtailTime& cachedLogTime,
StringView& timeStrCache) {
if (!IsSupportedEvent(e)) {
return true;
Expand All @@ -132,7 +132,7 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
}
mProcParseInSizeBytes->Add(buffer.size());
int64_t logTime_in_micro = 0;
time_t logTime = ApsaraEasyReadLogTimeParser(buffer, timeStrCache, lastLogTime, logTime_in_micro);
time_t logTime = ApsaraEasyReadLogTimeParser(buffer, timeStrCache, cachedLogTime, logTime_in_micro);
if (logTime <= 0) // this case will handle empty apsara log line
{
StringView bufOut(buffer);
Expand Down Expand Up @@ -240,18 +240,19 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
/*
* 解析Apsara格式日志的时间。
* @param buffer - 包含日志数据的字符串视图。
* @param timeStr - 解析后的时间字符串
* @param lastLogTime - 上一条日志的时间戳(秒)。
* @param cachedTimeStr - 缓存的时间字符串
* @param cachedLogTime - 缓存的时间字符串的时间戳(秒),必须与cachedTimeStr同时修改
* @param microTime - 解析出的微秒时间戳。
* @return 解析出的时间戳(秒),如果解析失败,则返回0。
*/
time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffer,
StringView& timeStr,
LogtailTime& lastLogTime,
StringView& cachedTimeStr,
LogtailTime& cachedLogTime,
int64_t& microTime) {
if (buffer[0] != '[') {
return 0;
}
LogtailTime logTime = {};
if (buffer[1] == '1') // for normal time, e.g 1378882630, starts with '1'
{
int nanosecondLength = 0;
Expand All @@ -262,13 +263,13 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
}
// strTime is the content between '[' and ']' and ends with '\0'
std::string strTime = buffer.substr(1, pos).to_string();
auto strptimeResult = Strptime(strTime.c_str(), "%s", &lastLogTime, nanosecondLength);
auto strptimeResult = Strptime(strTime.c_str(), "%s", &logTime, nanosecondLength);
if (NULL == strptimeResult || strptimeResult[0] != ']') {
LOG_WARNING(sLogger, ("parse apsara log time", "fail")("string", buffer)("timeformat", "%s"));
return 0;
}
microTime = (int64_t)lastLogTime.tv_sec * 1000000 + lastLogTime.tv_nsec / 1000;
return lastLogTime.tv_sec;
microTime = (int64_t)logTime.tv_sec * 1000000 + logTime.tv_nsec / 1000;
return logTime.tv_sec;
}
// test other date format case
{
Expand All @@ -279,33 +280,42 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
}
// strTime is the content between '[' and ']' and ends with '\0'
std::string strTime = buffer.substr(1, pos).to_string();
if (IsPrefixString(strTime.c_str(), timeStr) == true) {
microTime = (int64_t)lastLogTime.tv_sec * 1000000 + lastLogTime.tv_nsec / 1000;
return lastLogTime.tv_sec;
}
struct tm tm;
memset(&tm, 0, sizeof(tm));
int nanosecondLength = 0;
if (IsPrefixString(strTime, cachedTimeStr) == true) {
if (strTime.size() > cachedTimeStr.size()) {
auto strptimeResult
= Strptime(strTime.c_str() + cachedTimeStr.size() + 1, "%f", &logTime, nanosecondLength);
if (NULL == strptimeResult) {
LOG_WARNING(sLogger,
("parse apsara log time microsecond",
"fail")("string", buffer)("timeformat", "%Y-%m-%d %H:%M:%S.%f"));
}
}
microTime = (int64_t)cachedLogTime.tv_sec * 1000000 + logTime.tv_nsec / 1000;
return cachedLogTime.tv_sec;
}
// parse second part
auto strptimeResult = Strptime(strTime.c_str(), "%Y-%m-%d %H:%M:%S", &lastLogTime, nanosecondLength);
auto strptimeResult = Strptime(strTime.c_str(), "%Y-%m-%d %H:%M:%S", &logTime, nanosecondLength);
if (NULL == strptimeResult) {
LOG_WARNING(sLogger,
("parse apsara log time", "fail")("string", buffer)("timeformat", "%Y-%m-%d %H:%M:%S"));
return 0;
}
// parse nanosecond part (optional)
if (*strptimeResult != '\0') {
strptimeResult = Strptime(strptimeResult + 1, "%f", &lastLogTime, nanosecondLength);
strptimeResult = Strptime(strptimeResult + 1, "%f", &logTime, nanosecondLength);
if (NULL == strptimeResult) {
LOG_WARNING(sLogger,
("parse apsara log time", "fail")("string", buffer)("timeformat", "%Y-%m-%d %H:%M:%S.%f"));
("parse apsara log time microsecond", "fail")("string", buffer)("timeformat",
"%Y-%m-%d %H:%M:%S.%f"));
}
}
logTime.tv_sec = logTime.tv_sec - mLogTimeZoneOffsetSecond;
microTime = (int64_t)logTime.tv_sec * 1000000 + logTime.tv_nsec / 1000;
// if the time is valid (strptime not return NULL), the date value size must be 19 ,like '2013-09-11 03:11:05'
timeStr = StringView(buffer.data() + 1, 19);
lastLogTime.tv_sec = lastLogTime.tv_sec - mLogTimeZoneOffsetSecond;
microTime = (int64_t)lastLogTime.tv_sec * 1000000 + lastLogTime.tv_nsec / 1000;
return lastLogTime.tv_sec;
cachedTimeStr = StringView(buffer.data() + 1, 19);
cachedLogTime = logTime;
return logTime.tv_sec;
}
}

Expand All @@ -315,16 +325,8 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
* @param prefix - 要检查的前缀。
* @return 如果字符串以指定前缀开头,则返回true;否则返回false。
*/
bool ProcessorParseApsaraNative::IsPrefixString(const char* all, const StringView& prefix) {
if (prefix.size() == 0)
return false;
for (size_t i = 0; i < prefix.size(); ++i) {
if (all[i] == '\0')
return false;
if (all[i] != prefix[i])
return false;
}
return true;
bool ProcessorParseApsaraNative::IsPrefixString(const std::string& all, const StringView& prefix) {
return !prefix.empty() && std::equal(prefix.begin(), prefix.end(), all.begin());
}

/*
Expand All @@ -340,14 +342,14 @@ static int32_t FindBaseFields(const StringView& buffer, int32_t beginIndexArray[
if (buffer[i] == '[') {
beginIndexArray[baseFieldNum] = i + 1;
} else if (buffer[i] == ']') {
if (buffer[i + 1] == '\t' || buffer[i + 1] == '\n') {
if (i + 1 == buffer.size() || buffer[i + 1] == '\t' || buffer[i + 1] == '\n') {
endIndexArray[baseFieldNum] = i;
baseFieldNum++;
}
if (baseFieldNum >= MAX_BASE_FIELD_NUM) {
break;
}
if (buffer[i + 1] == '\t' && buffer[i + 2] != '[') {
if (buffer[i + 1] == '\t' && (i + 2 == buffer.size() || buffer[i + 2] != '[')) {
break;
}
}
Expand Down Expand Up @@ -447,7 +449,7 @@ int32_t ProcessorParseApsaraNative::ParseApsaraBaseFields(const StringView& buff
} else if ((findFieldBitMap & 0x100) == 0 && IsFieldFileLine(buffer, beginIndex, endIndex)) {
findFieldBitMap |= 0x100;
int32_t colonIndex = FindColonIndex(buffer, beginIndex, endIndex);
AddLog(SLS_KEY_FILE, StringView(buffer.data() + beginIndex, endIndex - beginIndex), sourceEvent);
AddLog(SLS_KEY_FILE, StringView(buffer.data() + beginIndex, colonIndex - beginIndex), sourceEvent);
if (colonIndex < endIndex) {
AddLog(
SLS_KEY_LINE, StringView(buffer.data() + colonIndex + 1, endIndex - colonIndex - 1), sourceEvent);
Expand Down
2 changes: 1 addition & 1 deletion core/processor/ProcessorParseApsaraNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ProcessorParseApsaraNative : public Processor {
void AddLog(const StringView& key, const StringView& value, LogEvent& targetEvent, bool overwritten = true);
time_t
ApsaraEasyReadLogTimeParser(StringView& buffer, StringView& timeStr, LogtailTime& lastLogTime, int64_t& microTime);
bool IsPrefixString(const char* all, const StringView& prefix);
bool IsPrefixString(const std::string& all, const StringView& prefix);
int32_t ParseApsaraBaseFields(const StringView& buffer, LogEvent& sourceEvent);

int32_t mLogTimeZoneOffsetSecond = 0;
Expand Down
Loading