Skip to content

Commit 864643d

Browse files
ruflintsg
authored andcommitted
Fix input buffer on encoding problem (elastic#2661) (elastic#2669)
Based on elastic#2416
1 parent 565a14b commit 864643d

File tree

3 files changed

+61
-10
lines changed

3 files changed

+61
-10
lines changed

CHANGELOG.asciidoc

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ https://github.com/elastic/beats/compare/v5.0.0-beta1...master[Check the HEAD di
3838
*Topbeat*
3939

4040
*Filebeat*
41+
- Fix input buffer on encoding problem
4142

4243
*Winlogbeat*
4344

filebeat/harvester/reader/line.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ func (l *Line) advance() error {
131131
sz, err := l.decode(idx + len(l.nl))
132132
if err != nil {
133133
logp.Err("Error decoding line: %s", err)
134+
// In case of error increase size by unencoded length
135+
sz = idx + len(l.nl)
134136
}
135137

136138
// consume transformed bytes from input buffer
@@ -157,19 +159,20 @@ func (l *Line) decode(end int) (int, error) {
157159
var nDst, nSrc int
158160

159161
nDst, nSrc, err = l.decoder.Transform(buffer, inBytes[start:end], false)
160-
161-
start += nSrc
162-
163-
l.outBuffer.Write(buffer[:nDst])
164-
165162
if err != nil {
166-
if err == transform.ErrShortDst { // continue transforming
167-
// Reset error as decoding continues
168-
err = nil
169-
continue
163+
// Check if error is different from destination buffer too short
164+
if err != transform.ErrShortDst {
165+
l.outBuffer.Write(inBytes[0:end])
166+
start = end
167+
break
170168
}
171-
break
169+
170+
// Reset error as decoding continues
171+
err = nil
172172
}
173+
174+
start += nSrc
175+
l.outBuffer.Write(buffer[:nDst])
173176
}
174177

175178
l.byteCount += start

filebeat/tests/system/test_harvester.py

+47
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# coding=utf-8
2+
13
from filebeat import BaseTest
24
import os
35
import codecs
@@ -756,3 +758,48 @@ def test_truncate(self):
756758
# Check that only 1 registry entry as original was only truncated
757759
data = self.get_registry()
758760
assert len(data) == 1
761+
762+
763+
def test_decode_error(self):
764+
"""
765+
Tests that in case of a decoding error it is handled gracefully
766+
"""
767+
self.render_config_template(
768+
path=os.path.abspath(self.working_dir) + "/log/*",
769+
encoding="GBK", # Set invalid encoding for entry below which is actually uft-8
770+
)
771+
772+
os.mkdir(self.working_dir + "/log/")
773+
774+
logfile = self.working_dir + "/log/test.log"
775+
776+
with open(logfile, 'w') as file:
777+
file.write("hello world1" + "\n")
778+
779+
file.write('<meta content="瞭解「Google 商業解決方案」提供的各類服務軟件如何助您分析資料、刊登廣告、提升網站成效等。" name="description">' + '\n')
780+
file.write("hello world2" + "\n")
781+
782+
filebeat = self.start_beat()
783+
784+
# Make sure both files were read
785+
self.wait_until(
786+
lambda: self.output_has(lines=3),
787+
max_timeout=10)
788+
789+
# Wait until error shows up
790+
self.wait_until(
791+
lambda: self.log_contains("Error decoding line: simplifiedchinese: invalid GBK encoding"),
792+
max_timeout=5)
793+
794+
filebeat.check_kill_and_wait()
795+
796+
# Check that only 1 registry entry as original was only truncated
797+
data = self.get_registry()
798+
assert len(data) == 1
799+
800+
output = self.read_output_json()
801+
assert output[2]["message"] == "hello world2"
802+
803+
804+
805+

0 commit comments

Comments
 (0)