diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a3fba8f8ed10..04c0d1e855bc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -126,6 +126,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - New Filebeat coredns module to ingest coredns logs. It supports both native coredns deployment and coredns deployment in kubernetes. {pull}11200[11200] - New module for Cisco ASA logs. {issue}9200[9200] {pull}11171[11171] - Added support for Cisco ASA fields to the netflow input. {pull}11201[11201] +- Configurable line terminator. {pull}11015[11015] *Heartbeat* diff --git a/filebeat/_meta/common.reference.inputs.yml b/filebeat/_meta/common.reference.inputs.yml index e5e699790e30..4559bacda44a 100644 --- a/filebeat/_meta/common.reference.inputs.yml +++ b/filebeat/_meta/common.reference.inputs.yml @@ -80,6 +80,10 @@ filebeat.inputs: # This is especially useful for multiline log messages which can get large. #max_bytes: 10485760 + # Characters which separate the lines. Valid values: auto, line_feed, vertical_tab, form_feed, + # carriage_return, carriage_return_line_feed, next_line, line_separator, paragraph_separator. + #line_terminator: auto + ### Recursive glob configuration # Expand "**" patterns into regular glob patterns. diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 281c51b2b91c..a952a76ad4b5 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -485,6 +485,10 @@ filebeat.inputs: # This is especially useful for multiline log messages which can get large. #max_bytes: 10485760 + # Characters which separate the lines. Valid values: auto, line_feed, vertical_tab, form_feed, + # carriage_return, carriage_return_line_feed, next_line, line_separator, paragraph_separator. + #line_terminator: auto + ### Recursive glob configuration # Expand "**" patterns into regular glob patterns. diff --git a/filebeat/input/log/config.go b/filebeat/input/log/config.go index 234da58cc954..9a0f0d8dcb7c 100644 --- a/filebeat/input/log/config.go +++ b/filebeat/input/log/config.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/libbeat/common/match" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/reader/multiline" + "github.com/elastic/beats/libbeat/reader/readfile" "github.com/elastic/beats/libbeat/reader/readjson" ) @@ -55,8 +56,9 @@ var ( RecursiveGlob: true, // Harvester - BufferSize: 16 * humanize.KiByte, - MaxBytes: 10 * humanize.MiByte, + BufferSize: 16 * humanize.KiByte, + MaxBytes: 10 * humanize.MiByte, + LineTerminator: readfile.AutoLineTerminator, LogConfig: LogConfig{ Backoff: 1 * time.Second, BackoffFactor: 2, @@ -96,11 +98,12 @@ type config struct { ScanOrder string `config:"scan.order"` ScanSort string `config:"scan.sort"` - ExcludeLines []match.Matcher `config:"exclude_lines"` - IncludeLines []match.Matcher `config:"include_lines"` - MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` - Multiline *multiline.Config `config:"multiline"` - JSON *readjson.Config `config:"json"` + LineTerminator readfile.LineTerminator `config:"line_terminator"` + ExcludeLines []match.Matcher `config:"exclude_lines"` + IncludeLines []match.Matcher `config:"include_lines"` + MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` + Multiline *multiline.Config `config:"multiline"` + JSON *readjson.Config `config:"json"` // Hidden on purpose, used by the docker input: DockerJSON *struct { diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 9197793c7186..eb85420ec440 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -183,6 +183,8 @@ func (h *Harvester) Setup() error { return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err) } + logp.Debug("harvester", "Harvester setup successful. Line terminator: %d", h.config.LineTerminator) + return nil } @@ -564,7 +566,11 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { return nil, err } - r, err = readfile.NewEncodeReader(reader, h.encoding, h.config.BufferSize) + r, err = readfile.NewEncodeReader(reader, readfile.Config{ + Codec: h.encoding, + BufferSize: h.config.BufferSize, + Terminator: h.config.LineTerminator, + }) if err != nil { return nil, err } @@ -578,7 +584,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { r = readjson.NewJSONReader(r, h.config.JSON) } - r = readfile.NewStripNewline(r) + r = readfile.NewStripNewline(r, h.config.LineTerminator) if h.config.Multiline != nil { r, err = multiline.New(r, "\n", h.config.MaxBytes, h.config.Multiline) diff --git a/filebeat/input/log/harvester_test.go b/filebeat/input/log/harvester_test.go index fe3d7c4d1fca..a538f8f92bac 100644 --- a/filebeat/input/log/harvester_test.go +++ b/filebeat/input/log/harvester_test.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/reader" + "github.com/elastic/beats/libbeat/reader/readfile" "github.com/elastic/beats/libbeat/reader/readfile/encoding" ) @@ -82,8 +83,9 @@ func TestReadLine(t *testing.T) { MaxBackoff: 1 * time.Second, BackoffFactor: 2, }, - BufferSize: 100, - MaxBytes: 1000, + BufferSize: 100, + MaxBytes: 1000, + LineTerminator: readfile.LineFeed, }, source: source, } diff --git a/filebeat/scripts/tester/main.go b/filebeat/scripts/tester/main.go index ebac853d9c07..63ecd2a69aa2 100644 --- a/filebeat/scripts/tester/main.go +++ b/filebeat/scripts/tester/main.go @@ -133,12 +133,16 @@ func getLogsFromFile(logfile string, conf *logReaderConfig) ([]string, error) { } var r reader.Reader - r, err = readfile.NewEncodeReader(f, enc, 4096) + r, err = readfile.NewEncodeReader(f, readfile.Config{ + Codec: enc, + BufferSize: 4096, + Terminator: readfile.LineFeed, + }) if err != nil { return nil, err } - r = readfile.NewStripNewline(r) + r = readfile.NewStripNewline(r, readfile.LineFeed) if conf.multiPattern != "" { p, err := match.Compile(conf.multiPattern) diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 91042de561c7..938e9e6a9c63 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -19,6 +19,7 @@ filebeat.{{input_config | default("inputs")}}: harvester_buffer_size: {{harvester_buffer_size}} encoding: {{encoding | default("utf-8") }} tail_files: {{tail_files}} + line_terminator: {{ line_terminator }} backoff: 0.1s backoff_factor: 1 max_backoff: 0.1s diff --git a/filebeat/tests/system/input/test-newline.log b/filebeat/tests/system/input/test-newline.log new file mode 100644 index 000000000000..6de8137a9059 --- /dev/null +++ b/filebeat/tests/system/input/test-newline.log @@ -0,0 +1 @@ +hello world goodbye world diff --git a/libbeat/reader/multiline/multiline_test.go b/libbeat/reader/multiline/multiline_test.go index 95aba3c25565..dd9071804c19 100644 --- a/libbeat/reader/multiline/multiline_test.go +++ b/libbeat/reader/multiline/multiline_test.go @@ -277,12 +277,16 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade } var r reader.Reader - r, err = readfile.NewEncodeReader(in, enc, 4096) + r, err = readfile.NewEncodeReader(in, readfile.Config{ + Codec: enc, + BufferSize: 4096, + Terminator: readfile.LineFeed, + }) if err != nil { t.Fatalf("Failed to initialize line reader: %v", err) } - r, err = New(readfile.NewStripNewline(r), "\n", 1<<20, &cfg) + r, err = New(readfile.NewStripNewline(r, readfile.LineFeed), "\n", 1<<20, &cfg) if err != nil { t.Fatalf("failed to initialize reader: %v", err) } diff --git a/libbeat/reader/readfile/encode.go b/libbeat/reader/readfile/encode.go index 419a5de2c1e0..920d9a209209 100644 --- a/libbeat/reader/readfile/encode.go +++ b/libbeat/reader/readfile/encode.go @@ -31,14 +31,18 @@ type EncoderReader struct { reader *LineReader } +// Config stores the configuration for the readers required to read +// a file line by line +type Config struct { + Codec encoding.Encoding + BufferSize int + Terminator LineTerminator +} + // New creates a new Encode reader from input reader by applying // the given codec. -func NewEncodeReader( - r io.Reader, - codec encoding.Encoding, - bufferSize int, -) (EncoderReader, error) { - eReader, err := NewLineReader(r, codec, bufferSize) +func NewEncodeReader(r io.Reader, config Config) (EncoderReader, error) { + eReader, err := NewLineReader(r, config) return EncoderReader{eReader}, err } diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 07e8590b02e5..698aa3cff110 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -18,9 +18,10 @@ package readfile import ( + "bytes" + "fmt" "io" - "golang.org/x/text/encoding" "golang.org/x/text/transform" "github.com/elastic/beats/libbeat/common/streambuf" @@ -32,9 +33,9 @@ import ( // from raw input stream for every decoded line. type LineReader struct { reader io.Reader - codec encoding.Encoding bufferSize int nl []byte + decodedNl []byte inBuffer *streambuf.Buffer outBuffer *streambuf.Buffer inOffset int // input buffer read offset @@ -43,21 +44,26 @@ type LineReader struct { } // New creates a new reader object -func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*LineReader, error) { - encoder := codec.NewEncoder() +func NewLineReader(input io.Reader, config Config) (*LineReader, error) { + encoder := config.Codec.NewEncoder() // Create newline char based on encoding - nl, _, err := transform.Bytes(encoder, []byte{'\n'}) + terminator, ok := lineTerminatorCharacters[config.Terminator] + if !ok { + return nil, fmt.Errorf("unknown line terminator: %+v", config.Terminator) + } + + nl, _, err := transform.Bytes(encoder, terminator) if err != nil { return nil, err } return &LineReader{ reader: input, - codec: codec, - bufferSize: bufferSize, + bufferSize: config.BufferSize, + decoder: config.Codec.NewDecoder(), nl: nl, - decoder: codec.NewDecoder(), + decodedNl: terminator, inBuffer: streambuf.New(nil), outBuffer: streambuf.New(nil), }, nil @@ -74,7 +80,7 @@ func (r *LineReader) Next() ([]byte, int, error) { return nil, 0, err } - // Check last decoded byte really being '\n' also unencoded + // Check last decoded byte really being newline also unencoded // if not, continue reading buf := r.outBuffer.Bytes() @@ -84,14 +90,15 @@ func (r *LineReader) Next() ([]byte, int, error) { continue } - if buf[len(buf)-1] == '\n' { + if bytes.HasSuffix(buf, r.decodedNl) { break } else { logp.Debug("line", "Line ending char found which wasn't one: %c", buf[len(buf)-1]) + logp.Debug("line", "In %s", string(buf)) } } - // output buffer contains complete line ending with '\n'. Extract + // output buffer contains complete line ending with newline. Extract // byte slice from buffer and reset output buffer. bytes, err := r.outBuffer.Collect(r.outBuffer.Len()) r.outBuffer.Reset() @@ -112,7 +119,7 @@ func (r *LineReader) advance() error { // Initial check if buffer has already a newLine character idx := r.inBuffer.IndexFrom(r.inOffset, r.nl) - // fill inBuffer until '\n' sequence has been found in input buffer + // fill inBuffer until newline sequence has been found in input buffer for idx == -1 { // increase search offset to reduce iterations on buffer when looping newOffset := r.inBuffer.Len() - len(r.nl) @@ -140,7 +147,7 @@ func (r *LineReader) advance() error { idx = r.inBuffer.IndexFrom(r.inOffset, r.nl) } - // found encoded byte sequence for '\n' in buffer + // found encoded byte sequence for newline in buffer // -> decode input sequence into outBuffer sz, err := r.decode(idx + len(r.nl)) if err != nil { @@ -156,7 +163,7 @@ func (r *LineReader) advance() error { // continue scanning input buffer from last position + 1 r.inOffset = idx + 1 - sz if r.inOffset < 0 { - // fix inOffset if '\n' has encoding > 8bits + firl line has been decoded + // fix inOffset if newline has encoding > 8bits + firl line has been decoded r.inOffset = 0 } diff --git a/libbeat/reader/readfile/line_terminator.go b/libbeat/reader/readfile/line_terminator.go new file mode 100644 index 000000000000..68ab24736c24 --- /dev/null +++ b/libbeat/reader/readfile/line_terminator.go @@ -0,0 +1,85 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package readfile + +import "fmt" + +// LineTerminator is the option storing the line terminator characters +// Supported newline reference: https://en.wikipedia.org/wiki/Newline#Unicode +type LineTerminator uint8 + +const ( + // InvalidTerminator is the invalid terminator + InvalidTerminator LineTerminator = iota + // AutoLineTerminator accepts both LF and CR+LF + AutoLineTerminator + // LineFeed is the unicode char LF + LineFeed + // VerticalTab is the unicode char VT + VerticalTab + // FormFeed is the unicode char FF + FormFeed + // CarriageReturn is the unicode char CR + CarriageReturn + // CarriageReturnLineFeed is the unicode chars CR+LF + CarriageReturnLineFeed + // NextLine is the unicode char NEL + NextLine + // LineSeparator is the unicode char LS + LineSeparator + // ParagraphSeparator is the unicode char PS + ParagraphSeparator +) + +var ( + lineTerminators = map[string]LineTerminator{ + "auto": AutoLineTerminator, + "line_feed": LineFeed, + "vertical_tab": VerticalTab, + "form_feed": FormFeed, + "carriage_return": CarriageReturn, + "carriage_return_line_feed": CarriageReturnLineFeed, + "next_line": NextLine, + "line_separator": LineSeparator, + "paragraph_separator": ParagraphSeparator, + } + + lineTerminatorCharacters = map[LineTerminator][]byte{ + AutoLineTerminator: []byte{'\u000A'}, + LineFeed: []byte{'\u000A'}, + VerticalTab: []byte{'\u000B'}, + FormFeed: []byte{'\u000C'}, + CarriageReturn: []byte{'\u000D'}, + CarriageReturnLineFeed: []byte("\u000D\u000A"), + NextLine: []byte{'\u0085'}, + LineSeparator: []byte("\u2028"), + ParagraphSeparator: []byte("\u2029"), + } +) + +// Unpack unpacks the configuration from the config file +func (l *LineTerminator) Unpack(option string) error { + terminator, ok := lineTerminators[option] + if !ok { + return fmt.Errorf("invalid line terminator: %s", option) + } + + *l = terminator + + return nil +} diff --git a/libbeat/reader/readfile/line_test.go b/libbeat/reader/readfile/line_test.go index 634d04840d8a..7ac684163c9c 100644 --- a/libbeat/reader/readfile/line_test.go +++ b/libbeat/reader/readfile/line_test.go @@ -82,18 +82,19 @@ func TestReaderEncodings(t *testing.T) { buffer := bytes.NewBuffer(nil) codec, _ := codecFactory(buffer) + nl := lineTerminatorCharacters[LineFeed] // write with encoding to buffer writer := transform.NewWriter(buffer, codec.NewEncoder()) var expectedCount []int for _, line := range test.strings { writer.Write([]byte(line)) - writer.Write([]byte{'\n'}) + writer.Write(nl) expectedCount = append(expectedCount, buffer.Len()) } // create line reader - reader, err := NewLineReader(buffer, codec, 1024) + reader, err := NewLineReader(buffer, Config{codec, 1024, LineFeed}) if err != nil { t.Errorf("failed to initialize reader: %v", err) continue @@ -106,7 +107,7 @@ func TestReaderEncodings(t *testing.T) { for { bytes, sz, err := reader.Next() if sz > 0 { - readLines = append(readLines, string(bytes[:len(bytes)-1])) + readLines = append(readLines, string(bytes[:len(bytes)-len(nl)])) } if err != nil { @@ -132,6 +133,43 @@ func TestReaderEncodings(t *testing.T) { } } +func TestLineTerminators(t *testing.T) { + codecFactory, ok := encoding.FindEncoding("plain") + if !ok { + t.Errorf("can not find plain encoding") + } + + buffer := bytes.NewBuffer(nil) + codec, _ := codecFactory(buffer) + + for terminator, nl := range lineTerminatorCharacters { + buffer.Reset() + + buffer.Write([]byte("this is my first line")) + buffer.Write(nl) + buffer.Write([]byte("this is my second line")) + buffer.Write(nl) + + reader, err := NewLineReader(buffer, Config{codec, 1024, terminator}) + if err != nil { + t.Errorf("failed to initialize reader: %v", err) + continue + } + + nrLines := 0 + for { + line, _, err := reader.Next() + if err != nil { + break + } + + assert.True(t, bytes.HasSuffix(line, nl)) + nrLines++ + } + assert.Equal(t, nrLines, 2, "unexpected number of lines for terminator %+v", terminator) + } +} + func TestReadSingleLongLine(t *testing.T) { testReadLineLengths(t, []int{10 * 1024}) } @@ -184,7 +222,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) { // initialize reader buffer := bytes.NewBuffer(inputStream) codec, _ := encoding.Plain(buffer) - reader, err := NewLineReader(buffer, codec, buffer.Len()) + reader, err := NewLineReader(buffer, Config{codec, buffer.Len(), LineFeed}) if err != nil { t.Fatalf("Error initializing reader: %v", err) } diff --git a/libbeat/reader/readfile/strip_newline.go b/libbeat/reader/readfile/strip_newline.go index 8f0b0bc21e7e..e815020dc6b1 100644 --- a/libbeat/reader/readfile/strip_newline.go +++ b/libbeat/reader/readfile/strip_newline.go @@ -18,18 +18,31 @@ package readfile import ( + "bytes" + "github.com/elastic/beats/libbeat/reader" ) // StripNewline reader removes the last trailing newline characters from // read lines. type StripNewline struct { - reader reader.Reader + reader reader.Reader + nl []byte + lineEndingFunc func(*StripNewline, []byte) int } // New creates a new line reader stripping the last tailing newline. -func NewStripNewline(r reader.Reader) *StripNewline { - return &StripNewline{r} +func NewStripNewline(r reader.Reader, terminator LineTerminator) *StripNewline { + lineEndingFunc := (*StripNewline).lineEndingChars + if terminator == AutoLineTerminator { + lineEndingFunc = (*StripNewline).autoLineEndingChars + } + + return &StripNewline{ + reader: r, + nl: lineTerminatorCharacters[terminator], + lineEndingFunc: lineEndingFunc, + } } // Next returns the next line. @@ -40,20 +53,26 @@ func (p *StripNewline) Next() (reader.Message, error) { } L := message.Content - message.Content = L[:len(L)-lineEndingChars(L)] + message.Content = L[:len(L)-p.lineEndingFunc(p, L)] return message, err } // isLine checks if the given byte array is a line, means has a line ending \n -func isLine(l []byte) bool { - return l != nil && len(l) > 0 && l[len(l)-1] == '\n' +func (p *StripNewline) isLine(l []byte) bool { + return bytes.HasSuffix(l, p.nl) +} + +func (p *StripNewline) lineEndingChars(l []byte) int { + if !p.isLine(l) { + return 0 + } + + return len(p.nl) } -// lineEndingChars returns the number of line ending chars the given by array has -// In case of Unix/Linux files, it is -1, in case of Windows mostly -2 -func lineEndingChars(l []byte) int { - if !isLine(l) { +func (p *StripNewline) autoLineEndingChars(l []byte) int { + if !p.isLine(l) { return 0 } diff --git a/libbeat/reader/readfile/strip_newline_test.go b/libbeat/reader/readfile/strip_newline_test.go index 543056393e53..673fceb8c7f2 100644 --- a/libbeat/reader/readfile/strip_newline_test.go +++ b/libbeat/reader/readfile/strip_newline_test.go @@ -26,33 +26,29 @@ import ( ) func TestIsLine(t *testing.T) { - notLine := []byte("This is not a line") - assert.False(t, isLine(notLine)) + for terminator, nl := range lineTerminatorCharacters { + reader := NewStripNewline(nil, terminator) - notLine = []byte("This is not a line\n\r") - assert.False(t, isLine(notLine)) + notLine := []byte("This is not a line") + assert.False(t, reader.isLine(notLine)) - notLine = []byte("This is \n not a line") - assert.False(t, isLine(notLine)) - - line := []byte("This is a line \n") - assert.True(t, isLine(line)) - - line = []byte("This is a line\r\n") - assert.True(t, isLine(line)) + line := append([]byte("This is a line"), nl...) + assert.True(t, reader.isLine(line)) + } } func TestLineEndingChars(t *testing.T) { - line := []byte("Not ending line") - assert.Equal(t, 0, lineEndingChars(line)) + for terminator, nl := range lineTerminatorCharacters { + reader := NewStripNewline(nil, terminator) - line = []byte("N ending \n") - assert.Equal(t, 1, lineEndingChars(line)) + line := append([]byte("This is a line"), nl...) + assert.Equal(t, reader.lineEndingFunc(reader, line), len(nl)) + } +} - line = []byte("RN ending \r\n") - assert.Equal(t, 2, lineEndingChars(line)) +func TestAutoLineEndingChars(t *testing.T) { + reader := NewStripNewline(nil, AutoLineTerminator) - // This is an invalid option - line = []byte("NR ending \n\r") - assert.Equal(t, 0, lineEndingChars(line)) + assert.Equal(t, reader.lineEndingFunc(reader, []byte("this is a windows line\r\n")), 2) + assert.Equal(t, reader.lineEndingFunc(reader, []byte("this is a not windows line\n")), 1) } diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 1f315b1251ad..e3859d355af7 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -562,6 +562,10 @@ filebeat.inputs: # This is especially useful for multiline log messages which can get large. #max_bytes: 10485760 + # Characters which separate the lines. Valid values: auto, line_feed, vertical_tab, form_feed, + # carriage_return, carriage_return_line_feed, next_line, line_separator, paragraph_separator. + #line_terminator: auto + ### Recursive glob configuration # Expand "**" patterns into regular glob patterns.