From 6c40f10ca3d11e0a778000c1e92ba1cca2341e83 Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Thu, 1 Apr 2021 10:21:25 +0300 Subject: [PATCH 1/2] Properly update offset in case of unparasable line (#22685) (cherry picked from commit 655984e962d1b1156a9d02ffc5f3ae5d59d4e86e) --- CHANGELOG.next.asciidoc | 17 ++++++++ filebeat/input/filestream/input.go | 3 -- filebeat/input/log/harvester.go | 4 -- .../tests/files/logs/docker_corrupted.log | 21 ++++++++++ filebeat/tests/system/test_container.py | 39 +++++++++++++++++++ libbeat/reader/reader.go | 6 --- libbeat/reader/readjson/docker_json.go | 4 +- libbeat/reader/readjson/docker_json_test.go | 37 ++++++++++++++---- 8 files changed, 108 insertions(+), 23 deletions(-) create mode 100644 filebeat/tests/files/logs/docker_corrupted.log diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 945cde1337bc..47d38410bb29 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -174,6 +174,23 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix event.kind for system/syslog pipeline {issue}20365[20365] {pull}20390[20390] - Fix event.type for zeek/ssl and duplicate event.category for zeek/connection {pull}20696[20696] - Add json body check for sqs message. {pull}21727[21727] +- Fix long registry migration times. {pull}20717[20717] {issue}20705[20705] +- Fix event types and categories in auditd module to comply with ECS {pull}20652[20652] +- Update documentation in the azure module filebeat. {pull}20815[20815] +- Provide backwards compatibility for the `set` processor when Elasticsearch is less than 7.9.0. {pull}20908[20908] +- Remove wrongly mapped `tls.client.server_name` from `fortinet/firewall` fileset. {pull}20983[20983] +- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048] +- Fix error when processing AWS Cloudtrail Digest logs. {pull}21086[21086] {issue}20943[20943] +- Handle multiple upstreams in ingress-controller. {pull}21215[21215] +- Provide backwards compatibility for the `append` processor when Elasticsearch is less than 7.10.0. {pull}21159[21159] +- Fix checkpoint module when logs contain time field. {pull}20567[20567] +- Add field limit check for AWS Cloudtrail flattened fields. {pull}21388[21388] {issue}21382[21382] +- Fix syslog RFC 5424 parsing in the CheckPoint module. {pull}21854[21854] +- Fix incorrect connection state mapping in zeek connection pipeline. {pull}22151[22151] {issue}22149[22149] +- Fix handing missing eventtime and assignip field being set to N/A for fortinet module. {pull}22361[22361] +- Fix Zeek dashboard reference to `zeek.ssl.server.name` field. {pull}21696[21696] +- Fix for `field [source] not present as part of path [source.ip]` error in azure pipelines. {pull}22377[22377] +- Properly update offset in case of unparasable line. {pull}22685[22685] - Drop aws.vpcflow.pkt_srcaddr and aws.vpcflow.pkt_dstaddr when equal to "-". {pull}22721[22721] {issue}22716[22716] - Fix cisco umbrella module config by adding input variable. {pull}22892[22892] - Fix network.direction logic in zeek connection fileset. {pull}22967[22967] diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 8f21cb505cee..e436e602204e 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -309,9 +309,6 @@ func (inp *filestream) readFromSource( s.Offset = 0 case ErrClosed: log.Info("Reader was closed. Closing.") - case reader.ErrLineUnparsable: - log.Info("Skipping unparsable line in file.") - continue default: log.Errorf("Read line error: %v", err) } diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 6b16861f8ece..0d4e6d6b539b 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -331,10 +331,6 @@ func (h *Harvester) Run() error { logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source) case ErrInactive: logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive) - case reader.ErrLineUnparsable: - logp.Info("Skipping unparsable line in file: %v", h.state.Source) - //line unparsable, go to next line - continue default: logp.Err("Read line error: %v; File: %v", err, h.state.Source) } diff --git a/filebeat/tests/files/logs/docker_corrupted.log b/filebeat/tests/files/logs/docker_corrupted.log new file mode 100644 index 000000000000..b241a2691b98 --- /dev/null +++ b/filebeat/tests/files/logs/docker_corrupted.log @@ -0,0 +1,21 @@ +{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"} +{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"} +{"log":"patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n","stream":"stdout","time":"2016-03-02T22:59:04.626534779Z"} +{"log":"cp etc/packetbeat.template.json /build/packetbeat.template.json\n","stream":"stdout","time":"2016-03-02T22:59:04.639782988Z"} +{"log":"# linux\n","stream":"stdout","time":"2016-03-02T22:59:04.646276053Z"} +"log":"cp packetbeat.yml /build/packetbeat-linux.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.647847045Z"} +{"log":"# binary\n","stream":"stdout","time":"2016-03-02T22:59:04.653740138Z"} +{"log":"cp packetbeat.yml /build/packetbeat-binary.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.655979016Z"} +{"log":"# darwin\n","stream":"stdout","time":"2016-03-02T22:59:04.661181197Z"} +{"log":"cp packetbeat.yml /build/packetbeat-darwin.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.662859769Z"} +{"log":"sed -i.bk 's/device: any/device: en0/' /build/packetbeat-darwin.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.66649744Z"} +{"log":"rm /build/packetbeat-darwin.yml.bk\n","stream":"stdout","time":"2016-03-02T22:59:04.701199002Z"} +{"log":"# win\n","stream":"stdout","time":"2016-03-02T22:59:04.705067809Z"} +{"log":"cp packetbeat.yml /build/packetbeat-win.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.706629907Z"} +{"log":"sed -i.bk 's/device: any/device: 0/' /build/packetbeat-win.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.711993313Z"} +{"log":"rm /build/packetbeat-win.yml.bk\n","stream":"stdout","time":"2016-03-02T22:59:04.757913979Z"} +{"log":"Compiling for windows/amd64...\n","stream":"stdout","time":"2016-03-02T22:59:04.761895467Z"} +{"log":"Compiling for windows/386...\n","stream":"stdout","time":"2016-03-02T22:59:29.481736885Z"} +{"log":"Compiling for darwin/amd64...\n","stream":"stdout","time":"2016-03-02T22:59:55.205334574Z"} +{"log":"Moving binaries to host...\n","stream":"stdout","time":"2016-03-02T23:00:15.140397826Z"} diff --git a/filebeat/tests/system/test_container.py b/filebeat/tests/system/test_container.py index ee0df7eb8e96..067eabd19772 100644 --- a/filebeat/tests/system/test_container.py +++ b/filebeat/tests/system/test_container.py @@ -66,3 +66,42 @@ def test_container_input_cri(self): output = self.read_output() assert len(output) == 1 assert output[0]["stream"] == "stdout" + + def test_container_input_registry_for_unparsable_lines(self): + """ + Test container input properly updates registry offset in case + of unparsable lines + """ + input_raw = """ +- type: container + paths: + - {}/logs/*.log +""" + self.render_config_template( + input_raw=input_raw.format(os.path.abspath(self.working_dir)), + inputs=False, + ) + + os.mkdir(self.working_dir + "/logs/") + self.copy_files(["logs/docker_corrupted.log"], + target_dir="logs") + + filebeat = self.start_beat() + + self.wait_until(lambda: self.output_has(lines=20)) + + filebeat.check_kill_and_wait() + + output = self.read_output() + assert len(output) == 20 + assert output[19]["message"] == "Moving binaries to host..." + for o in output: + assert o["stream"] == "stdout" + + # Check that file exist + data = self.get_registry() + logs = self.log_access() + assert logs.contains("Parse line error") == True + # bytes of healthy file are 2244 so for the corrupted one should + # be 2244-1=2243 since we removed one character + assert data[0]["offset"] == 2243 diff --git a/libbeat/reader/reader.go b/libbeat/reader/reader.go index 81ae4ad82412..43c389ac7c6e 100644 --- a/libbeat/reader/reader.go +++ b/libbeat/reader/reader.go @@ -18,7 +18,6 @@ package reader import ( - "errors" "io" ) @@ -30,8 +29,3 @@ type Reader interface { io.Closer Next() (Message, error) } - -var ( - //ErrLineUnparsable is error thrown when Next() element from input is corrupted and can not be parsed - ErrLineUnparsable = errors.New("line is unparsable") -) diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go index 59dded97ec3a..d57c61c6a268 100644 --- a/libbeat/reader/readjson/docker_json.go +++ b/libbeat/reader/readjson/docker_json.go @@ -202,7 +202,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { err = p.parseLine(&message, &logLine) if err != nil { p.logger.Errorf("Parse line error: %v", err) - return message, reader.ErrLineUnparsable + continue } // Handle multiline messages, join partial lines @@ -219,7 +219,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { err = p.parseLine(&next, &logLine) if err != nil { p.logger.Errorf("Parse line error: %v", err) - return message, reader.ErrLineUnparsable + continue } message.Content = append(message.Content, next.Content...) } diff --git a/libbeat/reader/readjson/docker_json_test.go b/libbeat/reader/readjson/docker_json_test.go index 2c9e2e711047..de03b87da81e 100644 --- a/libbeat/reader/readjson/docker_json_test.go +++ b/libbeat/reader/readjson/docker_json_test.go @@ -18,6 +18,7 @@ package readjson import ( + "io" "testing" "time" @@ -53,7 +54,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong JSON", input: [][]byte{[]byte(`this is not JSON`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 16, }, @@ -73,7 +74,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 37, }, @@ -82,7 +83,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`{this is not JSON nor CRI`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 25, }, @@ -91,7 +92,7 @@ func TestDockerJSON(t *testing.T) { name: "Missing time", input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 82, }, @@ -218,7 +219,7 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", format: "cri", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 82, }, @@ -228,7 +229,7 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)}, stream: "all", format: "docker", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 115, }, @@ -300,7 +301,7 @@ func TestDockerJSON(t *testing.T) { []byte(`{"log":"shutdown...\n","stream`), }, stream: "stdout", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 139, }, @@ -324,11 +325,25 @@ func TestDockerJSON(t *testing.T) { name: "Corrupted log message line", input: [][]byte{[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 97, }, }, + { + name: "Corrupted log message line is skipped, keep correct bytes count", + input: [][]byte{ + []byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + }, + stream: "all", + expectedMessage: reader.Message{ + Content: []byte("1:M 09 Nov 13:27:36.276 # User requested"), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + Bytes: 205, + }, + }, } for _, test := range tests { @@ -358,6 +373,12 @@ type mockReader struct { } func (m *mockReader) Next() (reader.Message, error) { + if len(m.messages) < 1 { + return reader.Message{ + Content: []byte{}, + Bytes: 0, + }, io.EOF + } message := m.messages[0] m.messages = m.messages[1:] return reader.Message{ From 96287338fea4227dd90c4ce06ecedf4f9b6c405a Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Thu, 1 Apr 2021 10:33:26 +0300 Subject: [PATCH 2/2] Update CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 47d38410bb29..10f24ddf1ac0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -174,22 +174,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix event.kind for system/syslog pipeline {issue}20365[20365] {pull}20390[20390] - Fix event.type for zeek/ssl and duplicate event.category for zeek/connection {pull}20696[20696] - Add json body check for sqs message. {pull}21727[21727] -- Fix long registry migration times. {pull}20717[20717] {issue}20705[20705] -- Fix event types and categories in auditd module to comply with ECS {pull}20652[20652] -- Update documentation in the azure module filebeat. {pull}20815[20815] -- Provide backwards compatibility for the `set` processor when Elasticsearch is less than 7.9.0. {pull}20908[20908] -- Remove wrongly mapped `tls.client.server_name` from `fortinet/firewall` fileset. {pull}20983[20983] -- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048] -- Fix error when processing AWS Cloudtrail Digest logs. {pull}21086[21086] {issue}20943[20943] -- Handle multiple upstreams in ingress-controller. {pull}21215[21215] -- Provide backwards compatibility for the `append` processor when Elasticsearch is less than 7.10.0. {pull}21159[21159] -- Fix checkpoint module when logs contain time field. {pull}20567[20567] -- Add field limit check for AWS Cloudtrail flattened fields. {pull}21388[21388] {issue}21382[21382] -- Fix syslog RFC 5424 parsing in the CheckPoint module. {pull}21854[21854] -- Fix incorrect connection state mapping in zeek connection pipeline. {pull}22151[22151] {issue}22149[22149] -- Fix handing missing eventtime and assignip field being set to N/A for fortinet module. {pull}22361[22361] -- Fix Zeek dashboard reference to `zeek.ssl.server.name` field. {pull}21696[21696] -- Fix for `field [source] not present as part of path [source.ip]` error in azure pipelines. {pull}22377[22377] - Properly update offset in case of unparasable line. {pull}22685[22685] - Drop aws.vpcflow.pkt_srcaddr and aws.vpcflow.pkt_dstaddr when equal to "-". {pull}22721[22721] {issue}22716[22716] - Fix cisco umbrella module config by adding input variable. {pull}22892[22892]