diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 04aefc9734bc..1035df05ee81 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -34,6 +34,7 @@ https://github.com/elastic/beats/compare/v7.2.0...7.2[Check the HEAD diff] *Filebeat* +- Skipping unparsable log entries from docker json reader {pull}12268[12268] - Add support for client addresses with port in Apache error logs {pull}12695[12695] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 469ae109da9d..d56ed0f67965 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -276,6 +276,10 @@ func (h *Harvester) Run() error { logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source) case ErrInactive: logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive) + case reader.ErrLineUnparsable: + logp.Info("Skipping unparsable line in file: %v", h.state.Source) + //line unparsable, go to next line + continue default: logp.Err("Read line error: %v; File: %v", err, h.state.Source) } diff --git a/libbeat/reader/reader.go b/libbeat/reader/reader.go index 653ceb8f1ea3..5aadcd611eb0 100644 --- a/libbeat/reader/reader.go +++ b/libbeat/reader/reader.go @@ -17,6 +17,10 @@ package reader +import ( + "errors" +) + // Reader is the interface that wraps the basic Next method for // getting a new message. // Next returns the message being read or and error. EOF is returned @@ -24,3 +28,8 @@ package reader type Reader interface { Next() (Message, error) } + +var ( + //ErrLineUnparsable is error thrown when Next() element from input is corrupted and can not be parsed + ErrLineUnparsable = errors.New("line is unparsable") +) diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go index 83301b98b5d8..76124b059726 100644 --- a/libbeat/reader/readjson/docker_json.go +++ b/libbeat/reader/readjson/docker_json.go @@ -27,6 +27,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/reader" ) @@ -187,7 +188,8 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { var logLine logLine err = p.parseLine(&message, &logLine) if err != nil { - return message, err + logp.Err("Parse line error: %v", err) + return message, reader.ErrLineUnparsable } // Handle multiline messages, join partial lines @@ -203,7 +205,8 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { } err = p.parseLine(&next, &logLine) if err != nil { - return message, err + logp.Err("Parse line error: %v", err) + return message, reader.ErrLineUnparsable } message.Content = append(message.Content, next.Content...) } diff --git a/libbeat/reader/readjson/docker_json_test.go b/libbeat/reader/readjson/docker_json_test.go index 4a9ca5f6c73e..a288a7300916 100644 --- a/libbeat/reader/readjson/docker_json_test.go +++ b/libbeat/reader/readjson/docker_json_test.go @@ -35,7 +35,7 @@ func TestDockerJSON(t *testing.T) { partial bool format string criflags bool - expectedError bool + expectedError error expectedMessage reader.Message }{ { @@ -53,7 +53,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong JSON", input: [][]byte{[]byte(`this is not JSON`)}, stream: "all", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 16, }, @@ -73,7 +73,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)}, stream: "all", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 37, }, @@ -82,7 +82,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`{this is not JSON nor CRI`)}, stream: "all", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 25, }, @@ -91,7 +91,7 @@ func TestDockerJSON(t *testing.T) { name: "Missing time", input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 82, }, @@ -218,7 +218,7 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", format: "cri", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 82, }, @@ -228,7 +228,7 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)}, stream: "all", format: "docker", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 115, }, @@ -300,12 +300,21 @@ func TestDockerJSON(t *testing.T) { []byte(`{"log":"shutdown...\n","stream`), }, stream: "stdout", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 139, }, partial: true, }, + { + name: "Corrupted log message line", + input: [][]byte{[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)}, + stream: "all", + expectedError: reader.ErrLineUnparsable, + expectedMessage: reader.Message{ + Bytes: 97, + }, + }, } for _, test := range tests { @@ -314,8 +323,9 @@ func TestDockerJSON(t *testing.T) { json := New(r, test.stream, test.partial, test.format, test.criflags) message, err := json.Next() - if test.expectedError { + if test.expectedError != nil { assert.Error(t, err) + assert.Equal(t, test.expectedError, err) } else { assert.NoError(t, err) }