diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f9c5ad05c0c0..73d358f2badd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v6.8.0...6.8.1[Check the HEAD diff] *Filebeat* +- Properly update offset in case of unparasable line. {pull}22685[22685] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 12a26172e2f8..7e3163cf4b03 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -278,10 +278,6 @@ func (h *Harvester) Run() error { logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source) case ErrInactive: logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive) - case reader.ErrLineUnparsable: - logp.Info("Skipping unparsable line in file: %v", h.state.Source) - //line unparsable, go to next line - continue default: logp.Err("Read line error: %v; File: %v", err, h.state.Source) } diff --git a/filebeat/reader/docker_json/docker_json.go b/filebeat/reader/docker_json/docker_json.go index b6efa092b54e..c5e8182e2fe7 100644 --- a/filebeat/reader/docker_json/docker_json.go +++ b/filebeat/reader/docker_json/docker_json.go @@ -187,7 +187,7 @@ func (p *Reader) Next() (reader.Message, error) { err = p.parseLine(&message, &logLine) if err != nil { logp.Err("Parse line error: %v", err) - return message, reader.ErrLineUnparsable + continue } // Handle multiline messages, join partial lines @@ -204,7 +204,7 @@ func (p *Reader) Next() (reader.Message, error) { err = p.parseLine(&next, &logLine) if err != nil { logp.Err("Parse line error: %v", err) - return message, reader.ErrLineUnparsable + continue } message.Content = append(message.Content, next.Content...) } diff --git a/filebeat/reader/docker_json/docker_json_test.go b/filebeat/reader/docker_json/docker_json_test.go index 5645823e9b15..f14bbb7ed30a 100644 --- a/filebeat/reader/docker_json/docker_json_test.go +++ b/filebeat/reader/docker_json/docker_json_test.go @@ -18,6 +18,7 @@ package docker_json import ( + "io" "testing" "time" @@ -53,7 +54,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong JSON", input: [][]byte{[]byte(`this is not JSON`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 16, }, @@ -62,7 +63,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 37, }, @@ -71,7 +72,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`{this is not JSON nor CRI`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 25, }, @@ -80,7 +81,7 @@ func TestDockerJSON(t *testing.T) { name: "Missing time", input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 82, }, @@ -207,7 +208,7 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", forceCRI: true, - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 82, }, @@ -279,7 +280,7 @@ func TestDockerJSON(t *testing.T) { []byte(`{"log":"shutdown...\n","stream`), }, stream: "stdout", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 139, }, @@ -289,11 +290,25 @@ func TestDockerJSON(t *testing.T) { name: "Corrupted log message line", input: [][]byte{[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 97, }, }, + { + name: "Corrupted log message line is skipped, keep correct bytes count", + input: [][]byte{ + []byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + }, + stream: "all", + expectedMessage: reader.Message{ + Content: []byte("1:M 09 Nov 13:27:36.276 # User requested"), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + Bytes: 205, + }, + }, } for _, test := range tests { @@ -323,6 +338,12 @@ type mockReader struct { } func (m *mockReader) Next() (reader.Message, error) { + if len(m.messages) < 1 { + return reader.Message{ + Content: []byte{}, + Bytes: 0, + }, io.EOF + } message := m.messages[0] m.messages = m.messages[1:] return reader.Message{ diff --git a/filebeat/reader/reader.go b/filebeat/reader/reader.go index 5aadcd611eb0..653ceb8f1ea3 100644 --- a/filebeat/reader/reader.go +++ b/filebeat/reader/reader.go @@ -17,10 +17,6 @@ package reader -import ( - "errors" -) - // Reader is the interface that wraps the basic Next method for // getting a new message. // Next returns the message being read or and error. EOF is returned @@ -28,8 +24,3 @@ import ( type Reader interface { Next() (Message, error) } - -var ( - //ErrLineUnparsable is error thrown when Next() element from input is corrupted and can not be parsed - ErrLineUnparsable = errors.New("line is unparsable") -)