diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 23230265c179..c40b7352b5e7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -39,6 +39,8 @@ https://github.com/elastic/beats/compare/v6.8.0...6.8.1[Check the HEAD diff] *Filebeat* +- Skipping unparsable log entries from docker json reader {pull}12268[12268] + *Heartbeat* *Journalbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 52731f97ef35..c0422538642e 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -278,6 +278,10 @@ func (h *Harvester) Run() error { logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source) case ErrInactive: logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive) + case reader.ErrLineUnparsable: + logp.Info("Skipping unparsable line in file: %v", h.state.Source) + //line unparsable, go to next line + continue default: logp.Err("Read line error: %v; File: %v", err, h.state.Source) } diff --git a/filebeat/reader/docker_json/docker_json.go b/filebeat/reader/docker_json/docker_json.go index 0f08630c26aa..b6efa092b54e 100644 --- a/filebeat/reader/docker_json/docker_json.go +++ b/filebeat/reader/docker_json/docker_json.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/filebeat/reader" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" ) // Reader processor renames a given field @@ -185,7 +186,8 @@ func (p *Reader) Next() (reader.Message, error) { var logLine logLine err = p.parseLine(&message, &logLine) if err != nil { - return message, err + logp.Err("Parse line error: %v", err) + return message, reader.ErrLineUnparsable } // Handle multiline messages, join partial lines @@ -201,7 +203,8 @@ func (p *Reader) Next() (reader.Message, error) { } err = p.parseLine(&next, &logLine) if err != nil { - return message, err + logp.Err("Parse line error: %v", err) + return message, reader.ErrLineUnparsable } message.Content = append(message.Content, next.Content...) } diff --git a/filebeat/reader/docker_json/docker_json_test.go b/filebeat/reader/docker_json/docker_json_test.go index 12d85a619ea5..5645823e9b15 100644 --- a/filebeat/reader/docker_json/docker_json_test.go +++ b/filebeat/reader/docker_json/docker_json_test.go @@ -35,7 +35,7 @@ func TestDockerJSON(t *testing.T) { partial bool forceCRI bool criflags bool - expectedError bool + expectedError error expectedMessage reader.Message }{ { @@ -53,7 +53,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong JSON", input: [][]byte{[]byte(`this is not JSON`)}, stream: "all", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 16, }, @@ -62,7 +62,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)}, stream: "all", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 37, }, @@ -71,7 +71,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`{this is not JSON nor CRI`)}, stream: "all", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 25, }, @@ -80,7 +80,7 @@ func TestDockerJSON(t *testing.T) { name: "Missing time", input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 82, }, @@ -207,7 +207,7 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", forceCRI: true, - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 82, }, @@ -279,12 +279,21 @@ func TestDockerJSON(t *testing.T) { []byte(`{"log":"shutdown...\n","stream`), }, stream: "stdout", - expectedError: true, + expectedError: reader.ErrLineUnparsable, expectedMessage: reader.Message{ Bytes: 139, }, partial: true, }, + { + name: "Corrupted log message line", + input: [][]byte{[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)}, + stream: "all", + expectedError: reader.ErrLineUnparsable, + expectedMessage: reader.Message{ + Bytes: 97, + }, + }, } for _, test := range tests { @@ -293,8 +302,9 @@ func TestDockerJSON(t *testing.T) { json := New(r, test.stream, test.partial, test.forceCRI, test.criflags) message, err := json.Next() - if test.expectedError { + if test.expectedError != nil { assert.Error(t, err) + assert.Equal(t, test.expectedError, err) } else { assert.NoError(t, err) } diff --git a/filebeat/reader/reader.go b/filebeat/reader/reader.go index 653ceb8f1ea3..5aadcd611eb0 100644 --- a/filebeat/reader/reader.go +++ b/filebeat/reader/reader.go @@ -17,6 +17,10 @@ package reader +import ( + "errors" +) + // Reader is the interface that wraps the basic Next method for // getting a new message. // Next returns the message being read or and error. EOF is returned @@ -24,3 +28,8 @@ package reader type Reader interface { Next() (Message, error) } + +var ( + //ErrLineUnparsable is error thrown when Next() element from input is corrupted and can not be parsed + ErrLineUnparsable = errors.New("line is unparsable") +)