From dfe43f5d6971a178775688bf66da3fea14ab7542 Mon Sep 17 00:00:00 2001 From: u5surf Date: Sun, 2 Jul 2023 21:12:35 +0900 Subject: [PATCH 1/4] Add LTSV tailer --- src/core/tailer/tailer.go | 57 ++++++++++++++++++++++++++++++++++ src/core/tailer/tailer_test.go | 54 ++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/src/core/tailer/tailer.go b/src/core/tailer/tailer.go index e6d798cb5..e3d86096a 100644 --- a/src/core/tailer/tailer.go +++ b/src/core/tailer/tailer.go @@ -10,6 +10,7 @@ package tailer import ( "context" "io" + "strings" "github.com/mitchellh/mapstructure" "github.com/nxadm/tail" @@ -66,6 +67,11 @@ type PatternTailer struct { gc *grok.CompiledGrok } +type LTSVTailer struct { + handle *tail.Tail + ltsvParseString func(line string) map[string]string +} + func NewTailer(file string) (*Tailer, error) { t, err := tail.TailFile(file, tailConfig) if err != nil { @@ -95,6 +101,28 @@ func NewPatternTailer(file string, patterns map[string]string) (*PatternTailer, return &PatternTailer{t, gc}, nil } +func NewLTSVTailer(file string) (*LTSVTailer, error) { + ltsvParseString := func(line string) map[string]string { + columns := strings.Split(line, "\t") + lvs := make(map[string]string) + for _, column := range columns { + lv := strings.SplitN(column, ":", 2) + if len(lv) < 2 { + continue + } + label, value := strings.TrimSpace(lv[0]), strings.TrimSpace(lv[1]) + lvs[label] = value + } + return lvs + } + t, err := tail.TailFile(file, tailConfig) + if err != nil { + return nil, err + } + + return <SVTailer{t, ltsvParseString}, nil +} + func (t *Tailer) Tail(ctx context.Context, data chan<- string) { for { select { @@ -150,3 +178,32 @@ func (t *PatternTailer) Tail(ctx context.Context, data chan<- map[string]string) } } } + +func (t *LTSVTailer) Tail(ctx context.Context, data chan<- map[string]string) { + for { + select { + case line := <-t.handle.Lines: + if line == nil { + return + } + if line.Err != nil { + continue + } + + l := t.ltsvParseString(line.Text) + if l != nil { + data <- l + } + case <-ctx.Done(): + ctxErr := ctx.Err() + switch ctxErr { + case context.DeadlineExceeded: + log.Tracef("Tailer cancelled because deadline was exceeded, %v", ctxErr) + case context.Canceled: + log.Tracef("Tailer forcibly cancelled, %v", ctxErr) + } + log.Tracef("Tailer is done") + return + } + } +} diff --git a/src/core/tailer/tailer_test.go b/src/core/tailer/tailer_test.go index 6ddb5aecc..ae7acbf1a 100644 --- a/src/core/tailer/tailer_test.go +++ b/src/core/tailer/tailer_test.go @@ -137,3 +137,57 @@ T: os.Remove(accessLogFile.Name()) assert.Equal(t, 1, count) } + +func TestLTSVTailer(t *testing.T) { + accessLogFile, _ := os.CreateTemp(os.TempDir(), "access.log") + logLine := "remote_addr:127.0.0.1\t remote_user:-\t time_local:04/Nov/2020:19:40:38 +0000\t request:GET /500 HTTP/1.1\t status:500\t body_bytes_sent:4\t http_referer:-\t http_user_agent:curl/7.64.1\n" + + tailer, err := NewLTSVTailer(accessLogFile.Name()) + require.Nil(t, err) + + timeoutDuration := time.Millisecond * 300 + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + defer cancel() + + data := make(chan map[string]string, 100) + go tailer.Tail(ctx, data) + + time.Sleep(time.Millisecond * 100) + _, err = accessLogFile.WriteString(logLine) + if err != nil { + t.Fatalf("Error writing data to access log") + } + accessLogFile.Close() + + var count int + var res map[string]string +T: + for { + select { + case r := <-data: + res = r + count++ + case <-time.After(timeoutDuration): + break T + case <-ctx.Done(): + break T + } + } + + os.Remove(accessLogFile.Name()) + assert.Equal(t, 1, count) + assert.Equal( + t, + map[string]string{ + "body_bytes_sent": "4", + "http_referer": "-", + "http_user_agent": "curl/7.64.1", + "remote_addr": "127.0.0.1", + "remote_user": "-", + "request": "GET /500 HTTP/1.1", + "status": "500", + "time_local": "04/Nov/2020:19:40:38 +0000", + }, + res, + ) +} From 5f86a8ebb072583bc1a30770ab3a3d3d65b43809 Mon Sep 17 00:00:00 2001 From: u5surf Date: Sun, 2 Jul 2023 23:13:48 +0900 Subject: [PATCH 2/4] Support LTSV log format * The many nginx's users choice the log format into Labeled Tab Separated Values to be able to analyze and extend logs more easily. * So, it should be support this format that the user can be more easily to switch from conventional log collector to nginx-agent. --- sdk/config_helpers.go | 8 ++- sdk/config_helpers_test.go | 27 +++++++-- src/core/metrics/sources/nginx_access_log.go | 22 ++++--- .../nginx/agent/sdk/v2/config_helpers.go | 8 ++- .../nginx/agent/sdk/v2/config_helpers.go | 8 ++- .../core/metrics/sources/nginx_access_log.go | 22 ++++--- .../nginx/agent/v2/src/core/tailer/tailer.go | 57 +++++++++++++++++++ .../nginx/agent/sdk/v2/config_helpers.go | 8 ++- 8 files changed, 137 insertions(+), 23 deletions(-) diff --git a/sdk/config_helpers.go b/sdk/config_helpers.go index 620c58b2e..70e057465 100644 --- a/sdk/config_helpers.go +++ b/sdk/config_helpers.go @@ -249,7 +249,11 @@ func updateNginxConfigFileConfig( switch directive.Directive { case "log_format": if len(directive.Args) >= 2 { - formatMap[directive.Args[0]] = strings.Join(directive.Args[1:], "") + if directive.Args[0] == "ltsv" { + formatMap[directive.Args[0]] = "ltsv" + } else { + formatMap[directive.Args[0]] = strings.Join(directive.Args[1:], "") + } } case "root": if err := updateNginxConfigFileWithRoot(aux, directive.Args[0], seen, allowedDirectories, directoryMap); err != nil { @@ -405,6 +409,8 @@ func updateNginxConfigWithAccessLog(file string, format string, nginxConfig *pro al.Format = formatMap[format] } else if format == "" || format == "combined" { al.Format = predefinedAccessLogFormat + } else if format == "ltsv" { + al.Format = format } else { al.Format = "" } diff --git a/sdk/config_helpers_test.go b/sdk/config_helpers_test.go index a82140514..f8a953d02 100644 --- a/sdk/config_helpers_test.go +++ b/sdk/config_helpers_test.go @@ -57,6 +57,7 @@ var files = []string{ "/tmp/testdata/nginx/other/mime.types", "/tmp/testdata/logs/access1.log", "/tmp/testdata/logs/access2.log", + "/tmp/testdata/logs/access3.log", "/tmp/testdata/logs/error.log", "/tmp/testdata/root/test.html", "/tmp/testdata/foo/test.html", @@ -89,6 +90,12 @@ var accessLogs = &proto.AccessLogs{ Permissions: "0644", Readable: true, }, + { + Name: "/tmp/testdata/logs/access3.log", + Format: "ltsv", + Permissions: "0644", + Readable: true, + }, }, } @@ -122,6 +129,10 @@ var tests = []struct { '"$request" $status $body_bytes_sent ' '"$http_referer" "$http_user_agent" ' 'rt=$request_time uct="$upstream_connect_time" uht="$upstream_header_time" urt="$upstream_response_time"'; + log_format ltsv 'remote_addr: $remote_addr\t remote_user: $remote_user\t time_local: $time_local\t ' + 'request: $request\t status:$status\t body_bytes_sent: $body_bytes_sent\t ' + 'referer: $http_referer\t user_agent: $http_user_agent\t' + 'rt: $request_time\t uct: $upstream_connect_time\t uht: $upstream_header_time\t urt: $upstream_response_time'; server_tokens off; charset utf-8; @@ -157,6 +168,7 @@ var tests = []struct { } access_log /tmp/testdata/logs/access2.log combined; + access_log /tmp/testdata/logs/access3.log ltsv; }`, plusApi: "http://127.0.0.1:80/privateapi", @@ -171,7 +183,7 @@ var tests = []struct { { Name: "nginx.conf", Permissions: "0644", - Lines: int32(52), + Lines: int32(58), }, { Name: "ca.crt", @@ -286,7 +298,7 @@ var tests = []struct { }, Zconfig: &proto.ZippedFile{ Contents: []uint8{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}, - Checksum: "b12f45dee53b801dffe5091354a985b16552d5453fdf832df6f34b3b2ef9d2ee", + Checksum: "44370346a326f3eec4d5a7e9554722e4b00a83e9c6f998d03f912a708c24bb39", RootDirectory: "/tmp/testdata/nginx", }, }, @@ -316,6 +328,10 @@ var tests = []struct { '"$request" $status $body_bytes_sent ' '"$http_referer" "$http_user_agent" ' 'rt=$request_time uct="$upstream_connect_time" uht="$upstream_header_time" urt="$upstream_response_time"'; + log_format ltsv 'remote_addr: $remote_addr\t remote_user: $remote_user\t time_local: $time_local\t ' + 'request: $request\t status:$status\t body_bytes_sent: $body_bytes_sent\t ' + 'referer: $http_referer\t user_agent: $http_user_agent\t' + 'rt: $request_time\t uct: $upstream_connect_time\t uht: $upstream_header_time\t urt: $upstream_response_time'; server_tokens off; charset utf-8; @@ -340,6 +356,7 @@ var tests = []struct { } access_log /tmp/testdata/logs/access2.log combined; + access_log /tmp/testdata/logs/access3.log ltsv; }`, plusApi: "http://127.0.0.1:80/stub_status", @@ -364,7 +381,7 @@ var tests = []struct { { Name: "nginx2.conf", Permissions: "0644", - Lines: int32(41), + Lines: int32(46), }, { Name: "ca.crt", @@ -426,7 +443,7 @@ var tests = []struct { }, Zconfig: &proto.ZippedFile{ Contents: []uint8{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}, - Checksum: "29fb1bed60766983ba835c80b3f4faf8aae145094e4d0b8b9cf5cb6b2bc3a9c3", + Checksum: "1b6422f8a17527b2e9f255b7362ab7c320cd4a2efea7bff3e402438e5877f00e", RootDirectory: "/tmp/testdata/nginx", }, }, @@ -1112,7 +1129,7 @@ func TestGetErrorAndAccessLogs(t *testing.T) { func TestGetAccessLogs(t *testing.T) { result := GetAccessLogs(accessLogs) - assert.Equal(t, []string{"/tmp/testdata/logs/access1.log", "/tmp/testdata/logs/access2.log"}, result) + assert.Equal(t, []string{"/tmp/testdata/logs/access1.log", "/tmp/testdata/logs/access2.log", "/tmp/testdata/logs/access3.log"}, result) } func TestGetErrorLogs(t *testing.T) { diff --git a/src/core/metrics/sources/nginx_access_log.go b/src/core/metrics/sources/nginx_access_log.go index ffd494c29..854fb4b58 100644 --- a/src/core/metrics/sources/nginx_access_log.go +++ b/src/core/metrics/sources/nginx_access_log.go @@ -212,14 +212,22 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string gzipRatios, requestLengths, requestTimes, upstreamResponseLength, upstreamResponseTimes, upstreamConnectTimes, upstreamHeaderTimes := []float64{}, []float64{}, []float64{}, []float64{}, []float64{}, []float64{}, []float64{} mu := sync.Mutex{} - - t, err := tailer.NewPatternTailer(logFile, map[string]string{"DEFAULT": logPattern}) - if err != nil { - log.Errorf("unable to tail %q: %v", logFile, err) - return - } data := make(chan map[string]string, 1024) - go t.Tail(ctx, data) + if logPattern == "ltsv" { + t, err := tailer.NewLTSVTailer(logFile) + if err != nil { + log.Errorf("unable to tail %q: %v", logFile, err) + return + } + go t.Tail(ctx, data) + } else { + t, err := tailer.NewPatternTailer(logFile, map[string]string{"DEFAULT": logPattern}) + if err != nil { + log.Errorf("unable to tail %q: %v", logFile, err) + return + } + go t.Tail(ctx, data) + } tick := time.NewTicker(c.collectionInterval) defer tick.Stop() diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 620c58b2e..70e057465 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -249,7 +249,11 @@ func updateNginxConfigFileConfig( switch directive.Directive { case "log_format": if len(directive.Args) >= 2 { - formatMap[directive.Args[0]] = strings.Join(directive.Args[1:], "") + if directive.Args[0] == "ltsv" { + formatMap[directive.Args[0]] = "ltsv" + } else { + formatMap[directive.Args[0]] = strings.Join(directive.Args[1:], "") + } } case "root": if err := updateNginxConfigFileWithRoot(aux, directive.Args[0], seen, allowedDirectories, directoryMap); err != nil { @@ -405,6 +409,8 @@ func updateNginxConfigWithAccessLog(file string, format string, nginxConfig *pro al.Format = formatMap[format] } else if format == "" || format == "combined" { al.Format = predefinedAccessLogFormat + } else if format == "ltsv" { + al.Format = format } else { al.Format = "" } diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 620c58b2e..70e057465 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -249,7 +249,11 @@ func updateNginxConfigFileConfig( switch directive.Directive { case "log_format": if len(directive.Args) >= 2 { - formatMap[directive.Args[0]] = strings.Join(directive.Args[1:], "") + if directive.Args[0] == "ltsv" { + formatMap[directive.Args[0]] = "ltsv" + } else { + formatMap[directive.Args[0]] = strings.Join(directive.Args[1:], "") + } } case "root": if err := updateNginxConfigFileWithRoot(aux, directive.Args[0], seen, allowedDirectories, directoryMap); err != nil { @@ -405,6 +409,8 @@ func updateNginxConfigWithAccessLog(file string, format string, nginxConfig *pro al.Format = formatMap[format] } else if format == "" || format == "combined" { al.Format = predefinedAccessLogFormat + } else if format == "ltsv" { + al.Format = format } else { al.Format = "" } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go index ffd494c29..854fb4b58 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/nginx_access_log.go @@ -212,14 +212,22 @@ func (c *NginxAccessLog) logStats(ctx context.Context, logFile, logFormat string gzipRatios, requestLengths, requestTimes, upstreamResponseLength, upstreamResponseTimes, upstreamConnectTimes, upstreamHeaderTimes := []float64{}, []float64{}, []float64{}, []float64{}, []float64{}, []float64{}, []float64{} mu := sync.Mutex{} - - t, err := tailer.NewPatternTailer(logFile, map[string]string{"DEFAULT": logPattern}) - if err != nil { - log.Errorf("unable to tail %q: %v", logFile, err) - return - } data := make(chan map[string]string, 1024) - go t.Tail(ctx, data) + if logPattern == "ltsv" { + t, err := tailer.NewLTSVTailer(logFile) + if err != nil { + log.Errorf("unable to tail %q: %v", logFile, err) + return + } + go t.Tail(ctx, data) + } else { + t, err := tailer.NewPatternTailer(logFile, map[string]string{"DEFAULT": logPattern}) + if err != nil { + log.Errorf("unable to tail %q: %v", logFile, err) + return + } + go t.Tail(ctx, data) + } tick := time.NewTicker(c.collectionInterval) defer tick.Stop() diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/tailer/tailer.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/tailer/tailer.go index e6d798cb5..e3d86096a 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/tailer/tailer.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/tailer/tailer.go @@ -10,6 +10,7 @@ package tailer import ( "context" "io" + "strings" "github.com/mitchellh/mapstructure" "github.com/nxadm/tail" @@ -66,6 +67,11 @@ type PatternTailer struct { gc *grok.CompiledGrok } +type LTSVTailer struct { + handle *tail.Tail + ltsvParseString func(line string) map[string]string +} + func NewTailer(file string) (*Tailer, error) { t, err := tail.TailFile(file, tailConfig) if err != nil { @@ -95,6 +101,28 @@ func NewPatternTailer(file string, patterns map[string]string) (*PatternTailer, return &PatternTailer{t, gc}, nil } +func NewLTSVTailer(file string) (*LTSVTailer, error) { + ltsvParseString := func(line string) map[string]string { + columns := strings.Split(line, "\t") + lvs := make(map[string]string) + for _, column := range columns { + lv := strings.SplitN(column, ":", 2) + if len(lv) < 2 { + continue + } + label, value := strings.TrimSpace(lv[0]), strings.TrimSpace(lv[1]) + lvs[label] = value + } + return lvs + } + t, err := tail.TailFile(file, tailConfig) + if err != nil { + return nil, err + } + + return <SVTailer{t, ltsvParseString}, nil +} + func (t *Tailer) Tail(ctx context.Context, data chan<- string) { for { select { @@ -150,3 +178,32 @@ func (t *PatternTailer) Tail(ctx context.Context, data chan<- map[string]string) } } } + +func (t *LTSVTailer) Tail(ctx context.Context, data chan<- map[string]string) { + for { + select { + case line := <-t.handle.Lines: + if line == nil { + return + } + if line.Err != nil { + continue + } + + l := t.ltsvParseString(line.Text) + if l != nil { + data <- l + } + case <-ctx.Done(): + ctxErr := ctx.Err() + switch ctxErr { + case context.DeadlineExceeded: + log.Tracef("Tailer cancelled because deadline was exceeded, %v", ctxErr) + case context.Canceled: + log.Tracef("Tailer forcibly cancelled, %v", ctxErr) + } + log.Tracef("Tailer is done") + return + } + } +} diff --git a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go index 620c58b2e..70e057465 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go +++ b/vendor/github.com/nginx/agent/sdk/v2/config_helpers.go @@ -249,7 +249,11 @@ func updateNginxConfigFileConfig( switch directive.Directive { case "log_format": if len(directive.Args) >= 2 { - formatMap[directive.Args[0]] = strings.Join(directive.Args[1:], "") + if directive.Args[0] == "ltsv" { + formatMap[directive.Args[0]] = "ltsv" + } else { + formatMap[directive.Args[0]] = strings.Join(directive.Args[1:], "") + } } case "root": if err := updateNginxConfigFileWithRoot(aux, directive.Args[0], seen, allowedDirectories, directoryMap); err != nil { @@ -405,6 +409,8 @@ func updateNginxConfigWithAccessLog(file string, format string, nginxConfig *pro al.Format = formatMap[format] } else if format == "" || format == "combined" { al.Format = predefinedAccessLogFormat + } else if format == "ltsv" { + al.Format = format } else { al.Format = "" } From 4115923ed113ded62fa367dd5614c9ef13a00f0a Mon Sep 17 00:00:00 2001 From: u5surf Date: Mon, 3 Jul 2023 20:10:51 +0900 Subject: [PATCH 3/4] Fix test --- sdk/config_helpers_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/config_helpers_test.go b/sdk/config_helpers_test.go index f8a953d02..2d8c454d9 100644 --- a/sdk/config_helpers_test.go +++ b/sdk/config_helpers_test.go @@ -183,7 +183,7 @@ var tests = []struct { { Name: "nginx.conf", Permissions: "0644", - Lines: int32(58), + Lines: int32(57), }, { Name: "ca.crt", @@ -298,7 +298,7 @@ var tests = []struct { }, Zconfig: &proto.ZippedFile{ Contents: []uint8{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}, - Checksum: "44370346a326f3eec4d5a7e9554722e4b00a83e9c6f998d03f912a708c24bb39", + Checksum: "5da60539dbedfe08011646f96b964af9be68dcd3bdb7b6cc2d64c06723bba659", RootDirectory: "/tmp/testdata/nginx", }, }, From 5e6df6d6db8544a3d27d71cff7f4705acc659647 Mon Sep 17 00:00:00 2001 From: u5surf Date: Mon, 3 Jul 2023 20:35:54 +0900 Subject: [PATCH 4/4] Move out ltsv parse func --- src/core/tailer/tailer.go | 36 +++++++++---------- src/core/tailer/tailer_test.go | 4 +-- .../nginx/agent/v2/src/core/tailer/tailer.go | 36 +++++++++---------- 3 files changed, 36 insertions(+), 40 deletions(-) diff --git a/src/core/tailer/tailer.go b/src/core/tailer/tailer.go index e3d86096a..de287f839 100644 --- a/src/core/tailer/tailer.go +++ b/src/core/tailer/tailer.go @@ -68,8 +68,7 @@ type PatternTailer struct { } type LTSVTailer struct { - handle *tail.Tail - ltsvParseString func(line string) map[string]string + handle *tail.Tail } func NewTailer(file string) (*Tailer, error) { @@ -102,25 +101,11 @@ func NewPatternTailer(file string, patterns map[string]string) (*PatternTailer, } func NewLTSVTailer(file string) (*LTSVTailer, error) { - ltsvParseString := func(line string) map[string]string { - columns := strings.Split(line, "\t") - lvs := make(map[string]string) - for _, column := range columns { - lv := strings.SplitN(column, ":", 2) - if len(lv) < 2 { - continue - } - label, value := strings.TrimSpace(lv[0]), strings.TrimSpace(lv[1]) - lvs[label] = value - } - return lvs - } t, err := tail.TailFile(file, tailConfig) if err != nil { return nil, err } - - return <SVTailer{t, ltsvParseString}, nil + return <SVTailer{t}, nil } func (t *Tailer) Tail(ctx context.Context, data chan<- string) { @@ -189,8 +174,7 @@ func (t *LTSVTailer) Tail(ctx context.Context, data chan<- map[string]string) { if line.Err != nil { continue } - - l := t.ltsvParseString(line.Text) + l := t.parse(line.Text) if l != nil { data <- l } @@ -207,3 +191,17 @@ func (t *LTSVTailer) Tail(ctx context.Context, data chan<- map[string]string) { } } } + +func (t *LTSVTailer) parse(line string) map[string]string { + columns := strings.Split(line, "\t") + lineMap := make(map[string]string) + for _, column := range columns { + labelValue := strings.SplitN(column, ":", 2) + if len(labelValue) < 2 { + continue + } + label, value := strings.TrimSpace(labelValue[0]), strings.TrimSpace(labelValue[1]) + lineMap[label] = value + } + return lineMap +} diff --git a/src/core/tailer/tailer_test.go b/src/core/tailer/tailer_test.go index ae7acbf1a..251838ecf 100644 --- a/src/core/tailer/tailer_test.go +++ b/src/core/tailer/tailer_test.go @@ -160,12 +160,12 @@ func TestLTSVTailer(t *testing.T) { accessLogFile.Close() var count int - var res map[string]string + var res map[string]string T: for { select { case r := <-data: - res = r + res = r count++ case <-time.After(timeoutDuration): break T diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/tailer/tailer.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/tailer/tailer.go index e3d86096a..de287f839 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/tailer/tailer.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/tailer/tailer.go @@ -68,8 +68,7 @@ type PatternTailer struct { } type LTSVTailer struct { - handle *tail.Tail - ltsvParseString func(line string) map[string]string + handle *tail.Tail } func NewTailer(file string) (*Tailer, error) { @@ -102,25 +101,11 @@ func NewPatternTailer(file string, patterns map[string]string) (*PatternTailer, } func NewLTSVTailer(file string) (*LTSVTailer, error) { - ltsvParseString := func(line string) map[string]string { - columns := strings.Split(line, "\t") - lvs := make(map[string]string) - for _, column := range columns { - lv := strings.SplitN(column, ":", 2) - if len(lv) < 2 { - continue - } - label, value := strings.TrimSpace(lv[0]), strings.TrimSpace(lv[1]) - lvs[label] = value - } - return lvs - } t, err := tail.TailFile(file, tailConfig) if err != nil { return nil, err } - - return <SVTailer{t, ltsvParseString}, nil + return <SVTailer{t}, nil } func (t *Tailer) Tail(ctx context.Context, data chan<- string) { @@ -189,8 +174,7 @@ func (t *LTSVTailer) Tail(ctx context.Context, data chan<- map[string]string) { if line.Err != nil { continue } - - l := t.ltsvParseString(line.Text) + l := t.parse(line.Text) if l != nil { data <- l } @@ -207,3 +191,17 @@ func (t *LTSVTailer) Tail(ctx context.Context, data chan<- map[string]string) { } } } + +func (t *LTSVTailer) parse(line string) map[string]string { + columns := strings.Split(line, "\t") + lineMap := make(map[string]string) + for _, column := range columns { + labelValue := strings.SplitN(column, ":", 2) + if len(labelValue) < 2 { + continue + } + label, value := strings.TrimSpace(labelValue[0]), strings.TrimSpace(labelValue[1]) + lineMap[label] = value + } + return lineMap +}