From 101a6411618551f2e1182e1455809a350cbef13d Mon Sep 17 00:00:00 2001 From: marqc Date: Mon, 27 May 2019 22:18:23 +0200 Subject: [PATCH 1/3] Skipping unparsable lines in docker input (#12268) (cherry picked from commit 3451d06cc192ca1077149f2e709ec248b6b6613f) --- CHANGELOG.next.asciidoc | 12 +++++++++ filebeat/input/log/harvester.go | 4 +++ libbeat/reader/reader.go | 9 +++++++ libbeat/reader/readjson/docker_json.go | 7 ++++-- libbeat/reader/readjson/docker_json_test.go | 28 ++++++++++++++------- 5 files changed, 49 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9e7ff05b1eff..7a7092645ef9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -39,6 +39,18 @@ https://github.com/elastic/beats/compare/v7.2.0...7.2[Check the HEAD diff] *Filebeat* - When TLS is configured for the TCP input and a `certificate_authorities` is configured we now default to `required` for the `client_authentication`. {pull}12584[12584] +- Add support for Cisco syslog format used by their switch. {pull}10760[10760] +- Cover empty request data, url and version in Apache2 module{pull}10730[10730] +- Fix registry entries not being cleaned due to race conditions. {pull}10747[10747] +- Improve detection of file deletion on Windows. {pull}10747[10747] +- Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263] +- Fix `add_docker_metadata` source matching, using `log.file.path` field now. {pull}11577[11577] +- Add missing Kubernetes metadata fields to Filebeat CoreDNS module, and fix a documentation error. {pull}11591[11591] +- Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524] +- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] +- Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125] +- Fix goroutine leak on non-explicit finalization of log input. {pull}12164[12164] +- Skipping unparsable log entries from docker json reader {pull}12268[12268] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 469ae109da9d..d56ed0f67965 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -276,6 +276,10 @@ func (h *Harvester) Run() error { logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source) case ErrInactive: logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive) + case reader.ErrLineUnparsable: + logp.Info("Skipping unparsable line in file: %v", h.state.Source) + //line unparsable, go to next line + continue default: logp.Err("Read line error: %v; File: %v", err, h.state.Source) } diff --git a/libbeat/reader/reader.go b/libbeat/reader/reader.go index 653ceb8f1ea3..5aadcd611eb0 100644 --- a/libbeat/reader/reader.go +++ b/libbeat/reader/reader.go @@ -17,6 +17,10 @@ package reader +import ( + "errors" +) + // Reader is the interface that wraps the basic Next method for // getting a new message. // Next returns the message being read or and error. EOF is returned @@ -24,3 +28,8 @@ package reader type Reader interface { Next() (Message, error) } + +var ( + //ErrLineUnparsable is error thrown when Next() element from input is corrupted and can not be parsed + ErrLineUnparsable = errors.New("line is unparsable") +) diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go index 83301b98b5d8..76124b059726 100644 --- a/libbeat/reader/readjson/docker_json.go +++ b/libbeat/reader/readjson/docker_json.go @@ -27,6 +27,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/reader" ) @@ -187,7 +188,8 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { var logLine logLine err = p.parseLine(&message, &logLine) if err != nil { - return message, err + logp.Err("Parse line error: %v", err) + return message, reader.ErrLineUnparsable } // Handle multiline messages, join partial lines @@ -203,7 +205,8 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { } err = p.parseLine(&next, &logLine) if err != nil { - return message, err + logp.Err("Parse line error: %v", err) + return message, reader.ErrLineUnparsable } message.Content = append(message.Content, next.Content...) } diff --git a/libbeat/reader/readjson/docker_json_test.go b/libbeat/reader/readjson/docker_json_test.go index 4a9ca5f6c73e..a288a7300916 100644 --- a/libbeat/reader/readjson/docker_json_test.go +++ b/libbeat/reader/readjson/docker_json_test.go @@ -35,7 +35,7 @@ func TestDockerJSON(t *testing.T) { partial bool format string criflags bool - expectedError bool + expectedError error expectedMessage reader.Message }{ { @@ -53,7 +53,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong JSON", input: [][]byte{[]byte(`this is not JSON`)}, stream: "all", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 16, }, @@ -73,7 +73,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)}, stream: "all", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 37, }, @@ -82,7 +82,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`{this is not JSON nor CRI`)}, stream: "all", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 25, }, @@ -91,7 +91,7 @@ func TestDockerJSON(t *testing.T) { name: "Missing time", input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 82, }, @@ -218,7 +218,7 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", format: "cri", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 82, }, @@ -228,7 +228,7 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)}, stream: "all", format: "docker", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 115, }, @@ -300,12 +300,21 @@ func TestDockerJSON(t *testing.T) { []byte(`{"log":"shutdown...\n","stream`), }, stream: "stdout", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 139, }, partial: true, }, + { + name: "Corrupted log message line", + input: [][]byte{[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)}, + stream: "all", + expectedError: reader.ErrLineUnparsable, + expectedMessage: reader.Message{ + Bytes: 97, + }, + }, } for _, test := range tests { @@ -314,8 +323,9 @@ func TestDockerJSON(t *testing.T) { json := New(r, test.stream, test.partial, test.format, test.criflags) message, err := json.Next() - if test.expectedError { + if test.expectedError != nil { assert.Error(t, err) + assert.Equal(t, test.expectedError, err) } else { assert.NoError(t, err) } From 88a512b956a31bb84947492c90759098dca1320a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Wed, 19 Jun 2019 09:58:16 +0200 Subject: [PATCH 2/3] Update CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7a7092645ef9..2367dd256e3c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -39,17 +39,6 @@ https://github.com/elastic/beats/compare/v7.2.0...7.2[Check the HEAD diff] *Filebeat* - When TLS is configured for the TCP input and a `certificate_authorities` is configured we now default to `required` for the `client_authentication`. {pull}12584[12584] -- Add support for Cisco syslog format used by their switch. {pull}10760[10760] -- Cover empty request data, url and version in Apache2 module{pull}10730[10730] -- Fix registry entries not being cleaned due to race conditions. {pull}10747[10747] -- Improve detection of file deletion on Windows. {pull}10747[10747] -- Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263] -- Fix `add_docker_metadata` source matching, using `log.file.path` field now. {pull}11577[11577] -- Add missing Kubernetes metadata fields to Filebeat CoreDNS module, and fix a documentation error. {pull}11591[11591] -- Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524] -- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] -- Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125] -- Fix goroutine leak on non-explicit finalization of log input. {pull}12164[12164] - Skipping unparsable log entries from docker json reader {pull}12268[12268] *Heartbeat* From c7e9ca09a91a973a1772f6fb59c5d3334f6c2cb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Wed, 3 Jul 2019 12:44:12 +0200 Subject: [PATCH 3/3] Update CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bb32a52f4d88..1035df05ee81 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -34,7 +34,6 @@ https://github.com/elastic/beats/compare/v7.2.0...7.2[Check the HEAD diff] *Filebeat* -- When TLS is configured for the TCP input and a `certificate_authorities` is configured we now default to `required` for the `client_authentication`. {pull}12584[12584] - Skipping unparsable log entries from docker json reader {pull}12268[12268] - Add support for client addresses with port in Apache error logs {pull}12695[12695]