Skip to content

add UsingOldContentTag. When UsingOldContentTag is set to false, the tag is now put into the context during cgo. #1169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 66 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
c72aa11
fix
quzard Aug 29, 2023
a8ddeb3
Merge branch 'alibaba:main' into main
quzard Aug 30, 2023
45bfd5b
Merge branch 'alibaba:main' into main
quzard Aug 31, 2023
7297fd3
Merge branch 'alibaba:main' into main
quzard Sep 8, 2023
671282f
Merge branch 'alibaba:main' into main
quzard Sep 13, 2023
9f4cee7
Merge branch 'alibaba:main' into main
quzard Sep 18, 2023
ca15089
Merge branch 'alibaba:main' into main
quzard Sep 19, 2023
b376cb8
Merge branch 'alibaba:main' into main
quzard Sep 20, 2023
30768cb
Merge branch 'alibaba:main' into main
quzard Sep 20, 2023
b9b9bcc
Merge remote-tracking branch 'alibaba/main' into main
quzard Sep 22, 2023
b2c6ef7
Merge branch 'alibaba:main' into main
quzard Sep 25, 2023
01b64ed
Merge branch 'alibaba:main' into main
quzard Sep 26, 2023
2fbde8d
Merge branch 'alibaba:main' into main
quzard Sep 26, 2023
a0f0363
add mUsingOldContentTag
quzard Oct 3, 2023
e7580fc
Merge branch 'alibaba:main' into main
quzard Oct 3, 2023
481dc0f
fix
quzard Oct 4, 2023
43b3cb5
fix
quzard Oct 4, 2023
3a3c964
fix
quzard Oct 4, 2023
122dcaa
fix
quzard Oct 8, 2023
72cd2dd
fix
quzard Oct 9, 2023
0e2d12a
fix
quzard Oct 9, 2023
db26b81
fix
quzard Oct 9, 2023
ddaaf90
fix
quzard Oct 9, 2023
f5bf8a1
fix
quzard Oct 10, 2023
3f0739a
addchangelog
quzard Oct 10, 2023
621d44e
Merge branch 'alibaba:main' into main
quzard Oct 12, 2023
c215179
fix
quzard Oct 12, 2023
3b7e1c8
Merge branch 'main' into tag
quzard Oct 12, 2023
177d93c
add changelog
quzard Oct 12, 2023
348fe3e
add extractTagsToLogTags
quzard Oct 12, 2023
017b348
add extractTagsToLogTags
quzard Oct 12, 2023
be102d2
fix
quzard Oct 12, 2023
0eee447
add logGroupPoolSize
quzard Oct 16, 2023
37bdf6c
fix
quzard Oct 16, 2023
ffdc5ed
fix
quzard Oct 16, 2023
e5f5c22
fix
quzard Oct 16, 2023
93e2cf1
fix
quzard Oct 16, 2023
e27ac25
fix
quzard Oct 16, 2023
987c66f
Merge branch 'alibaba:main' into tag
quzard Oct 17, 2023
c43db8f
fix
quzard Oct 17, 2023
97725a8
fox
quzard Oct 18, 2023
5a3c7dc
fix
quzard Oct 18, 2023
b784b11
set aggregator_context as default aggregator
quzard Oct 23, 2023
d3379c2
Add protoc description
quzard Oct 23, 2023
cae09f3
modify
quzard Oct 23, 2023
900a0f4
add AddToQueueWithRetry
quzard Oct 23, 2023
059a5dd
add LogGroupWithSize
quzard Oct 23, 2023
9a659b1
add Test_extractTagsToLogTags
quzard Oct 23, 2023
315adea
add comment
quzard Oct 23, 2023
faeb1f3
add comment
quzard Oct 23, 2023
d6753a8
add comment
quzard Oct 23, 2023
399fee0
add comment
quzard Oct 23, 2023
00c4ad8
delete nowLogGroupSizeMap
quzard Oct 23, 2023
447506a
make lint
quzard Oct 23, 2023
8d7f34b
modify changelog
quzard Oct 24, 2023
794d2c2
fix genernate.sh
quzard Oct 24, 2023
8b4c0dd
set aggregator_context as aggregator_default
quzard Oct 24, 2023
b185934
add WITHOUTGDB into e2e
quzard Oct 24, 2023
a4dd353
fix doc
quzard Oct 24, 2023
3e4b02e
logGroupPoolSize = 0 when reset
quzard Oct 24, 2023
167b95f
change LogGroupWithSize to *LogGroupWithSize
quzard Oct 24, 2023
cccf5e0
add tag_kv_regex
quzard Oct 24, 2023
03e0293
addToQueueWithRetry
quzard Oct 24, 2023
1f4b004
add using_old_content_tag
quzard Oct 24, 2023
96c3324
fix
quzard Oct 24, 2023
8031731
modify changelog
quzard Oct 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/config/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class Config {
uint16_t mSearchCheckpointDirDepth = 0; // Max directory depth when search checkpoint.

bool mEnableTimestampNanosecond = false;
bool mUsingOldContentTag = false;
// Deprecated
bool mEnablePreciseTimestamp = false;
std::string mPreciseTimestampKey;
Expand Down
6 changes: 6 additions & 0 deletions core/config/UserLogConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ void UserLogConfigParser::ParseAdvancedConfig(const Json::Value& originalVal, Co
cfg.mAdvancedConfig.mSpecifiedYear = static_cast<int32_t>(val.asUInt());
}
}
// using_old_content_tag
{
if (advancedVal.isMember("using_old_content_tag") && advancedVal["using_old_content_tag"].isBool()) {
cfg.mAdvancedConfig.mUsingOldContentTag = GetBoolValue(advancedVal, "using_old_content_tag");
}
}
// precise_timestamp
{
if (advancedVal.isMember("enable_timestamp_nanosecond") && advancedVal["enable_timestamp_nanosecond"].isBool()) {
Expand Down
6 changes: 6 additions & 0 deletions core/config_manager/ConfigManagerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -867,11 +867,17 @@ void ConfigManagerBase::LoadSingleUserConfig(const std::string& logName, const J
SetNotFoundJsonMember(pluginConfigJson["global"],
"EnableTimestampNanosecond",
config->mAdvancedConfig.mEnableTimestampNanosecond);
SetNotFoundJsonMember(pluginConfigJson["global"],
"UsingOldContentTag",
config->mAdvancedConfig.mUsingOldContentTag);
} else {
Json::Value pluginGlobalConfigJson;
SetNotFoundJsonMember(pluginGlobalConfigJson,
"EnableTimestampNanosecond",
config->mAdvancedConfig.mEnableTimestampNanosecond);
SetNotFoundJsonMember(pluginGlobalConfigJson,
"UsingOldContentTag",
config->mAdvancedConfig.mUsingOldContentTag);
pluginConfigJson["global"] = pluginGlobalConfigJson;
}
config->mPluginConfig = pluginConfigJson.toStyledString();
Expand Down
1 change: 1 addition & 0 deletions core/config_manager/ConfigYamlToJson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ ConfigYamlToJson::ConfigYamlToJson() {
mFileAdvancedConfigMap["DirBlackList"] = "dir_blacklist";
mFileAdvancedConfigMap["FilepathBlackList"] = "filepath_blacklist";
mFileAdvancedConfigMap["EnableTimestampNanosecond"] = "enable_timestamp_nanosecond";
mFileAdvancedConfigMap["UsingOldContentTag"] = "using_old_content_tag";
mFileAdvancedConfigMap["EnablePreciseTimestamp"] = "enable_precise_timestamp";
mFileAdvancedConfigMap["PreciseTimestampKey"] = "precise_timestamp_key";
mFileAdvancedConfigMap["PreciseTimestampUnit"] = "precise_timestamp_unit";
Expand Down
1 change: 1 addition & 0 deletions pkg/config/global_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type GlobalConfig struct {
DelayStopSec int

EnableTimestampNanosecond bool
UsingOldContentTag bool
EnableContainerdUpperDirDetect bool
EnableSlsMetricsFormat bool
}
Expand Down
16 changes: 10 additions & 6 deletions pluginmanager/logstore_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,17 @@ func (lc *LogstoreConfig) ProcessLogGroup(logByte []byte, packID string) int {
if len(topic) > 0 {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: "__log_topic__", Value: topic})
}
for _, tag := range logGroup.LogTags {
log.Contents = append(log.Contents, &protocol.Log_Content{
Key: tagPrefix + tag.GetKey(),
Value: tag.GetValue(),
})
if lc.GlobalConfig.UsingOldContentTag {
for _, tag := range logGroup.LogTags {
log.Contents = append(log.Contents, &protocol.Log_Content{
Key: tagPrefix + tag.GetKey(),
Value: tag.GetValue(),
})
}
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
} else {
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic, "tags": logGroup.LogTags}})
}
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
}
return 0
}
Expand Down
13 changes: 10 additions & 3 deletions pluginmanager/plugin_runner_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,16 @@ func (p *pluginv1Runner) Init(inputQueueSize int, flushQueueSize int) error {

func (p *pluginv1Runner) Initialized() error {
if len(p.AggregatorPlugins) == 0 {
logger.Debug(p.LogstoreConfig.Context.GetRuntimeContext(), "add default aggregator")
if err := loadAggregator("aggregator_default", p.LogstoreConfig, nil); err != nil {
return err
if p.LogstoreConfig.GlobalConfig.UsingOldContentTag {
logger.Debug(p.LogstoreConfig.Context.GetRuntimeContext(), "add default aggregator")
if err := loadAggregator("aggregator_default", p.LogstoreConfig, nil); err != nil {
return err
}
} else {
logger.Debug(p.LogstoreConfig.Context.GetRuntimeContext(), "add aggregator_context")
if err := loadAggregator("aggregator_context", p.LogstoreConfig, nil); err != nil {
return err
}
}
}
if len(p.FlusherPlugins) == 0 {
Expand Down
38 changes: 34 additions & 4 deletions plugins/aggregator/context/aggregator_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,36 @@ func (p *AggregatorContext) Add(log *protocol.Log, ctx map[string]interface{}) e
logGroupList = make([]*protocol.LogGroup, 0, p.MaxLogGroupCount)
}
if len(logGroupList) == 0 {
logGroupList = append(logGroupList, p.newLogGroup(source, topic))
if _, ok := ctx["tags"]; ok {
newLogGroup := p.newLogGroup(source, topic)
fillTags(ctx["tags"].([]*protocol.LogTag), newLogGroup)
logGroupList = append(logGroupList, newLogGroup)
} else {
logGroupList = append(logGroupList, p.newLogGroup(source, topic))
}
}
nowLogGroup := logGroupList[len(logGroupList)-1]
tagChanged := false
// Determine whether the tag of the current log is consistent with that of the previous logs. If it is inconsistent, need to create a new logGroup.
if _, ok := ctx["tags"]; ok && len(nowLogGroup.Logs) > 0 {
tags := map[string]string{}
for _, tag := range nowLogGroup.LogTags {
tags[tag.GetKey()] = tag.GetValue()
}
for _, tag := range ctx["tags"].([]*protocol.LogTag) {
if v, ok := tags[tag.GetKey()]; !ok || v != tag.GetValue() {
tagChanged = true
break
}
}
}

logSize := p.evaluateLogSize(log)
// When current log group is full (log count or no more capacity for current log),
// allocate a new log group.
if len(nowLogGroup.Logs) >= p.MaxLogCount || p.nowLogGroupSizeMap[source]+logSize > MaxLogGroupSize {
if len(nowLogGroup.Logs) >= p.MaxLogCount || p.nowLogGroupSizeMap[source]+logSize > MaxLogGroupSize || tagChanged {
// The number of log group exceeds limit, make a quick flush.
if len(logGroupList) == p.MaxLogGroupCount {
if len(logGroupList) == p.MaxLogGroupCount || (tagChanged && len(logGroupList) > 0) {
// Quick flush to avoid becoming bottleneck when large logs come.
if err := p.queue.Add(logGroupList[0]); err == nil {
// add success, remove head log group
Expand All @@ -115,7 +135,13 @@ func (p *AggregatorContext) Add(log *protocol.Log, ctx map[string]interface{}) e
}
// New log group, reset size.
p.nowLogGroupSizeMap[source] = 0
logGroupList = append(logGroupList, p.newLogGroup(source, topic))
if _, ok := ctx["tags"]; ok {
newLogGroup := p.newLogGroup(source, topic)
fillTags(ctx["tags"].([]*protocol.LogTag), newLogGroup)
logGroupList = append(logGroupList, newLogGroup)
} else {
logGroupList = append(logGroupList, p.newLogGroup(source, topic))
}
nowLogGroup = logGroupList[len(logGroupList)-1]
}

Expand All @@ -126,6 +152,10 @@ func (p *AggregatorContext) Add(log *protocol.Log, ctx map[string]interface{}) e
return nil
}

func fillTags(logTags []*protocol.LogTag, logGroup *protocol.LogGroup) {
logGroup.LogTags = append(logGroup.LogTags, logTags...)
}

// Flush ...
func (p *AggregatorContext) Flush() []*protocol.LogGroup {
p.lock.Lock()
Expand Down