Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- New Filebeat coredns module to ingest coredns logs. It supports both native coredns deployment and coredns deployment in kubernetes. {pull}11200[11200]
- New module for Cisco ASA logs. {issue}9200[9200] {pull}11171[11171]
- Added support for Cisco ASA fields to the netflow input. {pull}11201[11201]
- Configurable line terminator. {pull}11015[11015]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/common.reference.inputs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ filebeat.inputs:
# This is especially useful for multiline log messages which can get large.
#max_bytes: 10485760

# Characters which separate the lines. Valid values: auto, line_feed, vertical_tab, form_feed,
# carriage_return, carriage_return_line_feed, next_line, line_separator, paragraph_separator.
#line_terminator: auto

### Recursive glob configuration

# Expand "**" patterns into regular glob patterns.
Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,10 @@ filebeat.inputs:
# This is especially useful for multiline log messages which can get large.
#max_bytes: 10485760

# Characters which separate the lines. Valid values: auto, line_feed, vertical_tab, form_feed,
# carriage_return, carriage_return_line_feed, next_line, line_separator, paragraph_separator.
#line_terminator: auto

### Recursive glob configuration

# Expand "**" patterns into regular glob patterns.
Expand Down
17 changes: 10 additions & 7 deletions filebeat/input/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/reader/multiline"
"github.com/elastic/beats/libbeat/reader/readfile"
"github.com/elastic/beats/libbeat/reader/readjson"
)

Expand All @@ -55,8 +56,9 @@ var (
RecursiveGlob: true,

// Harvester
BufferSize: 16 * humanize.KiByte,
MaxBytes: 10 * humanize.MiByte,
BufferSize: 16 * humanize.KiByte,
MaxBytes: 10 * humanize.MiByte,
LineTerminator: readfile.AutoLineTerminator,
LogConfig: LogConfig{
Backoff: 1 * time.Second,
BackoffFactor: 2,
Expand Down Expand Up @@ -96,11 +98,12 @@ type config struct {
ScanOrder string `config:"scan.order"`
ScanSort string `config:"scan.sort"`

ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *multiline.Config `config:"multiline"`
JSON *readjson.Config `config:"json"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *multiline.Config `config:"multiline"`
JSON *readjson.Config `config:"json"`

// Hidden on purpose, used by the docker input:
DockerJSON *struct {
Expand Down
10 changes: 8 additions & 2 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ func (h *Harvester) Setup() error {
return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
}

logp.Debug("harvester", "Harvester setup successful. Line terminator: %d", h.config.LineTerminator)

return nil
}

Expand Down Expand Up @@ -564,7 +566,11 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

r, err = readfile.NewEncodeReader(reader, h.encoding, h.config.BufferSize)
r, err = readfile.NewEncodeReader(reader, readfile.Config{
Codec: h.encoding,
BufferSize: h.config.BufferSize,
Terminator: h.config.LineTerminator,
})
if err != nil {
return nil, err
}
Expand All @@ -578,7 +584,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
r = readjson.NewJSONReader(r, h.config.JSON)
}

r = readfile.NewStripNewline(r)
r = readfile.NewStripNewline(r, h.config.LineTerminator)

if h.config.Multiline != nil {
r, err = multiline.New(r, "\n", h.config.MaxBytes, h.config.Multiline)
Expand Down
6 changes: 4 additions & 2 deletions filebeat/input/log/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/reader"
"github.com/elastic/beats/libbeat/reader/readfile"
"github.com/elastic/beats/libbeat/reader/readfile/encoding"
)

Expand Down Expand Up @@ -82,8 +83,9 @@ func TestReadLine(t *testing.T) {
MaxBackoff: 1 * time.Second,
BackoffFactor: 2,
},
BufferSize: 100,
MaxBytes: 1000,
BufferSize: 100,
MaxBytes: 1000,
LineTerminator: readfile.LineFeed,
},
source: source,
}
Expand Down
8 changes: 6 additions & 2 deletions filebeat/scripts/tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,16 @@ func getLogsFromFile(logfile string, conf *logReaderConfig) ([]string, error) {
}

var r reader.Reader
r, err = readfile.NewEncodeReader(f, enc, 4096)
r, err = readfile.NewEncodeReader(f, readfile.Config{
Codec: enc,
BufferSize: 4096,
Terminator: readfile.LineFeed,
})
if err != nil {
return nil, err
}

r = readfile.NewStripNewline(r)
r = readfile.NewStripNewline(r, readfile.LineFeed)

if conf.multiPattern != "" {
p, err := match.Compile(conf.multiPattern)
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ filebeat.{{input_config | default("inputs")}}:
harvester_buffer_size: {{harvester_buffer_size}}
encoding: {{encoding | default("utf-8") }}
tail_files: {{tail_files}}
line_terminator: {{ line_terminator }}
backoff: 0.1s
backoff_factor: 1
max_backoff: 0.1s
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/input/test-newline.log
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
hello world goodbye world
8 changes: 6 additions & 2 deletions libbeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,16 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade
}

var r reader.Reader
r, err = readfile.NewEncodeReader(in, enc, 4096)
r, err = readfile.NewEncodeReader(in, readfile.Config{
Codec: enc,
BufferSize: 4096,
Terminator: readfile.LineFeed,
})
if err != nil {
t.Fatalf("Failed to initialize line reader: %v", err)
}

r, err = New(readfile.NewStripNewline(r), "\n", 1<<20, &cfg)
r, err = New(readfile.NewStripNewline(r, readfile.LineFeed), "\n", 1<<20, &cfg)
if err != nil {
t.Fatalf("failed to initialize reader: %v", err)
}
Expand Down
16 changes: 10 additions & 6 deletions libbeat/reader/readfile/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ type EncoderReader struct {
reader *LineReader
}

// Config stores the configuration for the readers required to read
// a file line by line
type Config struct {
Comment thread
kvch marked this conversation as resolved.
Outdated
Codec encoding.Encoding
BufferSize int
Terminator LineTerminator
}

// New creates a new Encode reader from input reader by applying
// the given codec.
func NewEncodeReader(
r io.Reader,
codec encoding.Encoding,
bufferSize int,
) (EncoderReader, error) {
eReader, err := NewLineReader(r, codec, bufferSize)
func NewEncodeReader(r io.Reader, config Config) (EncoderReader, error) {
eReader, err := NewLineReader(r, config)
return EncoderReader{eReader}, err
}

Expand Down
35 changes: 21 additions & 14 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package readfile

import (
"bytes"
"fmt"
"io"

"golang.org/x/text/encoding"
"golang.org/x/text/transform"

"github.com/elastic/beats/libbeat/common/streambuf"
Expand All @@ -32,9 +33,9 @@ import (
// from raw input stream for every decoded line.
type LineReader struct {
reader io.Reader
codec encoding.Encoding
bufferSize int
nl []byte
decodedNl []byte
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
inOffset int // input buffer read offset
Expand All @@ -43,21 +44,26 @@ type LineReader struct {
}

// New creates a new reader object
func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*LineReader, error) {
encoder := codec.NewEncoder()
func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
encoder := config.Codec.NewEncoder()

// Create newline char based on encoding
nl, _, err := transform.Bytes(encoder, []byte{'\n'})
terminator, ok := lineTerminatorCharacters[config.Terminator]
if !ok {
return nil, fmt.Errorf("unknown line terminator: %+v", config.Terminator)
}

nl, _, err := transform.Bytes(encoder, terminator)
if err != nil {
return nil, err
}

return &LineReader{
reader: input,
codec: codec,
bufferSize: bufferSize,
bufferSize: config.BufferSize,
decoder: config.Codec.NewDecoder(),
nl: nl,
decoder: codec.NewDecoder(),
decodedNl: terminator,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
}, nil
Expand All @@ -74,7 +80,7 @@ func (r *LineReader) Next() ([]byte, int, error) {
return nil, 0, err
}

// Check last decoded byte really being '\n' also unencoded
// Check last decoded byte really being newline also unencoded
// if not, continue reading
buf := r.outBuffer.Bytes()

Expand All @@ -84,14 +90,15 @@ func (r *LineReader) Next() ([]byte, int, error) {
continue
}

if buf[len(buf)-1] == '\n' {
if bytes.HasSuffix(buf, r.decodedNl) {
break
} else {
logp.Debug("line", "Line ending char found which wasn't one: %c", buf[len(buf)-1])
logp.Debug("line", "In %s", string(buf))
}
}

// output buffer contains complete line ending with '\n'. Extract
// output buffer contains complete line ending with newline. Extract
// byte slice from buffer and reset output buffer.
bytes, err := r.outBuffer.Collect(r.outBuffer.Len())
r.outBuffer.Reset()
Expand All @@ -112,7 +119,7 @@ func (r *LineReader) advance() error {
// Initial check if buffer has already a newLine character
idx := r.inBuffer.IndexFrom(r.inOffset, r.nl)

// fill inBuffer until '\n' sequence has been found in input buffer
// fill inBuffer until newline sequence has been found in input buffer
for idx == -1 {
// increase search offset to reduce iterations on buffer when looping
newOffset := r.inBuffer.Len() - len(r.nl)
Expand Down Expand Up @@ -140,7 +147,7 @@ func (r *LineReader) advance() error {
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}

// found encoded byte sequence for '\n' in buffer
// found encoded byte sequence for newline in buffer
// -> decode input sequence into outBuffer
sz, err := r.decode(idx + len(r.nl))
if err != nil {
Expand All @@ -156,7 +163,7 @@ func (r *LineReader) advance() error {
// continue scanning input buffer from last position + 1
r.inOffset = idx + 1 - sz
if r.inOffset < 0 {
// fix inOffset if '\n' has encoding > 8bits + firl line has been decoded
// fix inOffset if newline has encoding > 8bits + firl line has been decoded
r.inOffset = 0
}

Expand Down
85 changes: 85 additions & 0 deletions libbeat/reader/readfile/line_terminator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package readfile

import "fmt"

// LineTerminator is the option storing the line terminator characters
// Supported newline reference: https://en.wikipedia.org/wiki/Newline#Unicode
type LineTerminator uint8
Comment thread
kvch marked this conversation as resolved.
Outdated

const (
// InvalidTerminator is the invalid terminator
InvalidTerminator LineTerminator = iota
Comment thread
kvch marked this conversation as resolved.
Outdated
// AutoLineTerminator accepts both LF and CR+LF
AutoLineTerminator
// LineFeed is the unicode char LF
LineFeed
// VerticalTab is the unicode char VT
VerticalTab
// FormFeed is the unicode char FF
FormFeed
// CarriageReturn is the unicode char CR
CarriageReturn
// CarriageReturnLineFeed is the unicode chars CR+LF
CarriageReturnLineFeed
// NextLine is the unicode char NEL
NextLine
// LineSeparator is the unicode char LS
LineSeparator
// ParagraphSeparator is the unicode char PS
ParagraphSeparator
)

var (
lineTerminators = map[string]LineTerminator{
"auto": AutoLineTerminator,
"line_feed": LineFeed,
"vertical_tab": VerticalTab,
"form_feed": FormFeed,
"carriage_return": CarriageReturn,
"carriage_return_line_feed": CarriageReturnLineFeed,
"next_line": NextLine,
"line_separator": LineSeparator,
"paragraph_separator": ParagraphSeparator,
}

lineTerminatorCharacters = map[LineTerminator][]byte{
AutoLineTerminator: []byte{'\u000A'},
LineFeed: []byte{'\u000A'},
VerticalTab: []byte{'\u000B'},
FormFeed: []byte{'\u000C'},
CarriageReturn: []byte{'\u000D'},
CarriageReturnLineFeed: []byte("\u000D\u000A"),
NextLine: []byte{'\u0085'},
LineSeparator: []byte("\u2028"),
ParagraphSeparator: []byte("\u2029"),
}
)

// Unpack unpacks the configuration from the config file
func (l *LineTerminator) Unpack(option string) error {
Comment thread
kvch marked this conversation as resolved.
Outdated
terminator, ok := lineTerminators[option]
if !ok {
return fmt.Errorf("invalid line terminator: %s", option)
}

*l = terminator

return nil
}
Loading