Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add UsingOldContentTag. When UsingOldContentTag is set to false, the tag is now put into the context during cgo. #1169

Merged
merged 66 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
c72aa11
fix
quzard Aug 29, 2023
a8ddeb3
Merge branch 'alibaba:main' into main
quzard Aug 30, 2023
45bfd5b
Merge branch 'alibaba:main' into main
quzard Aug 31, 2023
7297fd3
Merge branch 'alibaba:main' into main
quzard Sep 8, 2023
671282f
Merge branch 'alibaba:main' into main
quzard Sep 13, 2023
9f4cee7
Merge branch 'alibaba:main' into main
quzard Sep 18, 2023
ca15089
Merge branch 'alibaba:main' into main
quzard Sep 19, 2023
b376cb8
Merge branch 'alibaba:main' into main
quzard Sep 20, 2023
30768cb
Merge branch 'alibaba:main' into main
quzard Sep 20, 2023
b9b9bcc
Merge remote-tracking branch 'alibaba/main' into main
quzard Sep 22, 2023
b2c6ef7
Merge branch 'alibaba:main' into main
quzard Sep 25, 2023
01b64ed
Merge branch 'alibaba:main' into main
quzard Sep 26, 2023
2fbde8d
Merge branch 'alibaba:main' into main
quzard Sep 26, 2023
a0f0363
add mUsingOldContentTag
quzard Oct 3, 2023
e7580fc
Merge branch 'alibaba:main' into main
quzard Oct 3, 2023
481dc0f
fix
quzard Oct 4, 2023
43b3cb5
fix
quzard Oct 4, 2023
3a3c964
fix
quzard Oct 4, 2023
122dcaa
fix
quzard Oct 8, 2023
72cd2dd
fix
quzard Oct 9, 2023
0e2d12a
fix
quzard Oct 9, 2023
db26b81
fix
quzard Oct 9, 2023
ddaaf90
fix
quzard Oct 9, 2023
f5bf8a1
fix
quzard Oct 10, 2023
3f0739a
addchangelog
quzard Oct 10, 2023
621d44e
Merge branch 'alibaba:main' into main
quzard Oct 12, 2023
c215179
fix
quzard Oct 12, 2023
3b7e1c8
Merge branch 'main' into tag
quzard Oct 12, 2023
177d93c
add changelog
quzard Oct 12, 2023
348fe3e
add extractTagsToLogTags
quzard Oct 12, 2023
017b348
add extractTagsToLogTags
quzard Oct 12, 2023
be102d2
fix
quzard Oct 12, 2023
0eee447
add logGroupPoolSize
quzard Oct 16, 2023
37bdf6c
fix
quzard Oct 16, 2023
ffdc5ed
fix
quzard Oct 16, 2023
e5f5c22
fix
quzard Oct 16, 2023
93e2cf1
fix
quzard Oct 16, 2023
e27ac25
fix
quzard Oct 16, 2023
987c66f
Merge branch 'alibaba:main' into tag
quzard Oct 17, 2023
c43db8f
fix
quzard Oct 17, 2023
97725a8
fox
quzard Oct 18, 2023
5a3c7dc
fix
quzard Oct 18, 2023
b784b11
set aggregator_context as default aggregator
quzard Oct 23, 2023
d3379c2
Add protoc description
quzard Oct 23, 2023
cae09f3
modify
quzard Oct 23, 2023
900a0f4
add AddToQueueWithRetry
quzard Oct 23, 2023
059a5dd
add LogGroupWithSize
quzard Oct 23, 2023
9a659b1
add Test_extractTagsToLogTags
quzard Oct 23, 2023
315adea
add comment
quzard Oct 23, 2023
faeb1f3
add comment
quzard Oct 23, 2023
d6753a8
add comment
quzard Oct 23, 2023
399fee0
add comment
quzard Oct 23, 2023
00c4ad8
delete nowLogGroupSizeMap
quzard Oct 23, 2023
447506a
make lint
quzard Oct 23, 2023
8d7f34b
modify changelog
quzard Oct 24, 2023
794d2c2
fix genernate.sh
quzard Oct 24, 2023
8b4c0dd
set aggregator_context as aggregator_default
quzard Oct 24, 2023
b185934
add WITHOUTGDB into e2e
quzard Oct 24, 2023
a4dd353
fix doc
quzard Oct 24, 2023
3e4b02e
logGroupPoolSize = 0 when reset
quzard Oct 24, 2023
167b95f
change LogGroupWithSize to *LogGroupWithSize
quzard Oct 24, 2023
cccf5e0
add tag_kv_regex
quzard Oct 24, 2023
03e0293
addToQueueWithRetry
quzard Oct 24, 2023
1f4b004
add using_old_content_tag
quzard Oct 24, 2023
96c3324
fix
quzard Oct 24, 2023
8031731
modify changelog
quzard Oct 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
BUILD_LOGTAIL_UT: OFF
ENABLE_COMPATIBLE_MODE: ON
ENABLE_STATIC_LINK_CRT: ON
WITHOUTGDB: ON
run: make dist && scripts/check_glibc.sh

- name: Build Docker
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/e2e-core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ jobs:
sudo chmod +x /usr/local/bin/docker-compose

- name: E2E Core Structure Test
env:
BUILD_LOGTAIL_UT: OFF
WITHOUTGDB: ON
run: make e2e-core

result:
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ jobs:
run: docker --version

- name: E2E Test
env:
BUILD_LOGTAIL_UT: OFF
WITHOUTGDB: ON
run: make e2e

- name: UnitTest E2e Engine
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ your changes, such as:
- [public] [both] [updated] add a new feature

## [Unreleased]

- [public] [both] [added] add UsingOldContentTag. When UsingOldContentTag is set to false, the Tag is now placed in the Meta instead of Logs during cgo.
1 change: 1 addition & 0 deletions core/config/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class Config {
uint16_t mSearchCheckpointDirDepth = 0; // Max directory depth when search checkpoint.
quzard marked this conversation as resolved.
Show resolved Hide resolved

bool mEnableTimestampNanosecond = false;
bool mUsingOldContentTag = false;
quzard marked this conversation as resolved.
Show resolved Hide resolved
// Deprecated
bool mEnablePreciseTimestamp = false;
quzard marked this conversation as resolved.
Show resolved Hide resolved
std::string mPreciseTimestampKey;
Expand Down
6 changes: 6 additions & 0 deletions core/config/UserLogConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ void UserLogConfigParser::ParseAdvancedConfig(const Json::Value& originalVal, Co
cfg.mAdvancedConfig.mSpecifiedYear = static_cast<int32_t>(val.asUInt());
}
}
// using_old_content_tag
{
if (advancedVal.isMember("using_old_content_tag") && advancedVal["using_old_content_tag"].isBool()) {
cfg.mAdvancedConfig.mUsingOldContentTag = GetBoolValue(advancedVal, "using_old_content_tag");
}
}
// precise_timestamp
{
if (advancedVal.isMember("enable_timestamp_nanosecond") && advancedVal["enable_timestamp_nanosecond"].isBool()) {
Expand Down
6 changes: 6 additions & 0 deletions core/config_manager/ConfigManagerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -872,11 +872,17 @@ void ConfigManagerBase::LoadSingleUserConfig(const std::string& logName, const J
SetNotFoundJsonMember(pluginConfigJson["global"],
"EnableTimestampNanosecond",
config->mAdvancedConfig.mEnableTimestampNanosecond);
SetNotFoundJsonMember(pluginConfigJson["global"],
"UsingOldContentTag",
config->mAdvancedConfig.mUsingOldContentTag);
} else {
Json::Value pluginGlobalConfigJson;
SetNotFoundJsonMember(pluginGlobalConfigJson,
"EnableTimestampNanosecond",
config->mAdvancedConfig.mEnableTimestampNanosecond);
SetNotFoundJsonMember(pluginGlobalConfigJson,
"UsingOldContentTag",
config->mAdvancedConfig.mUsingOldContentTag);
pluginConfigJson["global"] = pluginGlobalConfigJson;
}
config->mPluginConfig = pluginConfigJson.toStyledString();
Expand Down
1 change: 1 addition & 0 deletions core/config_manager/ConfigYamlToJson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ ConfigYamlToJson::ConfigYamlToJson() {
mFileAdvancedConfigMap["DirBlackList"] = "dir_blacklist";
mFileAdvancedConfigMap["FilepathBlackList"] = "filepath_blacklist";
mFileAdvancedConfigMap["EnableTimestampNanosecond"] = "enable_timestamp_nanosecond";
mFileAdvancedConfigMap["UsingOldContentTag"] = "using_old_content_tag";
mFileAdvancedConfigMap["EnablePreciseTimestamp"] = "enable_precise_timestamp";
mFileAdvancedConfigMap["PreciseTimestampKey"] = "precise_timestamp_key";
mFileAdvancedConfigMap["PreciseTimestampUnit"] = "precise_timestamp_unit";
Expand Down
2 changes: 1 addition & 1 deletion docs/cn/getting-started/how-to-use-prometheus-fetcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ iLogtail Prometheus 采集的Metrics 数据与日志同样遵循[iLogtail 的传

## E2E 快速上手
目前iLogtail 已经集成了prometheus 的E2E测试,可以在iLogtail 的根路径快速进行上手验证。
测试命令:_TEST_SCOPE=input_prometheus TEST_DEBUG=true make e2e(开启DEBUG 选项可以查看传输数据明细)_
测试命令:TEST_SCOPE=input_prometheus TEST_DEBUG=true make e2e(开启DEBUG 选项可以查看传输数据明细)_
```
TEST_DEBUG=true TEST_PROFILE=false ./scripts/e2e.sh behavior input_prometheus
=========================================
Expand Down
1 change: 1 addition & 0 deletions pkg/config/global_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type GlobalConfig struct {
DelayStopSec int

EnableTimestampNanosecond bool
UsingOldContentTag bool
EnableContainerdUpperDirDetect bool
EnableSlsMetricsFormat bool
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/protocol/proto/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# How to generate pb

cd ~
wget https://ghproxy.com/https://github.com/gogo/protobuf/archive/refs/tags/v1.3.2.tar.gz
tar xzf v1.3.2.tar.gz
mkdir -p ${GOPATH}/src/github.com/gogo
mv protobuf-1.3.2 ${GOPATH}/src/github.com/gogo/protobuf

cd ~
wget https://ghproxy.com/https://github.com/protocolbuffers/protobuf/releases/download/v3.14.0/protoc-3.14.0-linux-x86_64.zip
unzip protoc-3.14.0-linux-x86_64.zip
mv bin/protoc /usr/local/bin/
mv include/google/ /usr/local/include/

go install github.com/gogo/protobuf/protoc-gen-gogofaster@latest

bash pkg/protocol/proto/genernate.sh
1 change: 1 addition & 0 deletions pkg/protocol/proto/genernate.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ if [ -d "${PROTO_GEN_HOME}" ]; then
rm -rf "${PROTO_GEN_HOME}"
fi
mkdir "${PROTO_GEN_HOME}"
export PATH=$PATH:$GOPATH/bin

protoc -I="${PROTO_HOME}" \
-I="${GOPATH}/src" \
Expand Down
72 changes: 62 additions & 10 deletions pluginmanager/logstore_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,41 @@ func extractTags(rawTags []byte, log *protocol.Log) {
}
}

// extractTagsToLogTags extracts tags from rawTags and append them into []*protocol.LogTag.
// Rule: k1~=~v1^^^k2~=~v2
// rawTags
func extractTagsToLogTags(rawTags []byte) []*protocol.LogTag {
logTags := []*protocol.LogTag{}
defaultPrefixIndex := 0
for len(rawTags) != 0 {
idx := bytes.Index(rawTags, tagDelimiter)
var part []byte
if idx < 0 {
part = rawTags
rawTags = rawTags[len(rawTags):]
} else {
part = rawTags[:idx]
rawTags = rawTags[idx+len(tagDelimiter):]
}
if len(part) > 0 {
pos := bytes.Index(part, tagSeparator)
if pos > 0 {
logTags = append(logTags, &protocol.LogTag{
Key: string(part[:pos]),
Value: string(part[pos+len(tagSeparator):]),
})
} else {
logTags = append(logTags, &protocol.LogTag{
Key: defaultTagPrefix + strconv.Itoa(defaultPrefixIndex),
Value: string(part),
})
}
defaultPrefixIndex++
}
}
return logTags
}

// ProcessRawLogV2 ...
// V1 -> V2: enable topic field, and use tags field to pass more tags.
// unsafe parameter: rawLog,packID and tags
Expand All @@ -263,8 +298,14 @@ func (lc *LogstoreConfig) ProcessRawLogV2(rawLog []byte, packID string, topic st
if len(topic) > 0 {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: "__log_topic__", Value: topic})
}
extractTags(tags, log)
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
// When UsingOldContentTag is set to false, the tag is now put into the context during cgo.
if !lc.GlobalConfig.UsingOldContentTag {
logTags := extractTagsToLogTags(tags)
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic, "tags": logTags}})
} else {
extractTags(tags, log)
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
}
return 0
}

Expand All @@ -279,8 +320,14 @@ func (lc *LogstoreConfig) ProcessLog(logByte []byte, packID string, topic string
if len(topic) > 0 {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: "__log_topic__", Value: topic})
}
extractTags(tags, log)
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
// When UsingOldContentTag is set to false, the tag is now put into the context during cgo.
if !lc.GlobalConfig.UsingOldContentTag {
logTags := extractTagsToLogTags(tags)
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic, "tags": logTags}})
} else {
extractTags(tags, log)
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
}
return 0
}

Expand All @@ -298,13 +345,18 @@ func (lc *LogstoreConfig) ProcessLogGroup(logByte []byte, packID string) int {
if len(topic) > 0 {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: "__log_topic__", Value: topic})
}
for _, tag := range logGroup.LogTags {
log.Contents = append(log.Contents, &protocol.Log_Content{
Key: tagPrefix + tag.GetKey(),
Value: tag.GetValue(),
})
// When UsingOldContentTag is set to false, the tag is now put into the context during cgo.
if !lc.GlobalConfig.UsingOldContentTag {
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic, "tags": logGroup.LogTags}})
} else {
for _, tag := range logGroup.LogTags {
log.Contents = append(log.Contents, &protocol.Log_Content{
Key: tagPrefix + tag.GetKey(),
Value: tag.GetValue(),
})
}
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
}
lc.PluginRunner.ReceiveRawLog(&pipeline.LogWithContext{Log: log, Context: map[string]interface{}{"source": packID, "topic": topic}})
}
return 0
}
Expand Down
20 changes: 19 additions & 1 deletion pluginmanager/logstore_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/alibaba/ilogtail/pkg/config"
global_config "github.com/alibaba/ilogtail/pkg/config"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/pipeline"
Expand Down Expand Up @@ -414,6 +415,22 @@ func Test_extractTags(t *testing.T) {
assert.Equal(t, l.Contents[0].Value, "k2")
}

func Test_extractTagsToLogTags(t *testing.T) {
logTag := extractTagsToLogTags([]byte("k1~=~v1^^^k2~=~v2"))
assert.Equal(t, logTag[0].Key, "k1")
assert.Equal(t, logTag[0].Value, "v1")
assert.Equal(t, logTag[1].Key, "k2")
assert.Equal(t, logTag[1].Value, "v2")

logTag = extractTagsToLogTags([]byte("^^^k2~=~v2"))
assert.Equal(t, logTag[0].Key, "k2")
assert.Equal(t, logTag[0].Value, "v2")

logTag = extractTagsToLogTags([]byte("^^^k2"))
assert.Equal(t, logTag[0].Key, "__tag__:__prefix__0")
assert.Equal(t, logTag[0].Value, "k2")
}

func TestLogstoreConfig_ProcessRawLogV2(t *testing.T) {
rawLogs := []byte("12345")
topic := "topic"
Expand All @@ -423,7 +440,8 @@ func TestLogstoreConfig_ProcessRawLogV2(t *testing.T) {
l.PluginRunner = &pluginv1Runner{
LogsChan: make(chan *pipeline.LogWithContext, 10),
}

l.GlobalConfig = &config.LogtailGlobalConfig
l.GlobalConfig.UsingOldContentTag = true
{
assert.Equal(t, 0, l.ProcessRawLogV2(rawLogs, "", topic, tags))
assert.Equal(t, 1, len(l.PluginRunner.(*pluginv1Runner).LogsChan))
Expand Down
1 change: 1 addition & 0 deletions pluginmanager/plugin_runner_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ func (p *pluginv2Runner) Stop(exit bool) error {
return nil
}

// TODO: Design the ReceiveRawLogV2, which is passed in a PipelineGroupEvents not pipeline.LogWithContext, and tags should be added in the PipelineGroupEvents.
func (p *pluginv2Runner) ReceiveRawLog(in *pipeline.LogWithContext) {
md := models.NewMetadata()
if in.Context != nil {
Expand Down
4 changes: 2 additions & 2 deletions plugins/aggregator/aggregator_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package aggregator

import (
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/plugins/aggregator/baseagg"
"github.com/alibaba/ilogtail/plugins/aggregator/context"
)

func init() {
pipeline.Aggregators["aggregator_default"] = func() pipeline.Aggregator {
return baseagg.NewAggregatorBase()
return context.NewAggregatorContext()
}
}
Loading
Loading