Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// Config stores the options of a file stream.
type config struct {
readerConfig
Reader readerConfig `config:",inline"`

Paths []string `config:"paths"`
Close closerConfig `config:"close"`
Expand Down Expand Up @@ -79,7 +79,7 @@ type backoffConfig struct {

func defaultConfig() config {
return config{
readerConfig: defaultReaderConfig(),
Reader: defaultReaderConfig(),
Paths: []string{},
Close: defaultCloserConfig(),
CleanInactive: 0,
Expand Down
37 changes: 13 additions & 24 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,8 @@ type fileMeta struct {
// are actively written by other applications.
type filestream struct {
readerConfig readerConfig
bufferSize int
tailFile bool // TODO
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
lineTerminator readfile.LineTerminator
excludeLines []match.Matcher
includeLines []match.Matcher
maxBytes int
closerConfig closerConfig
}

Expand Down Expand Up @@ -97,9 +91,9 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
return nil, nil, fmt.Errorf("error while creating file identifier: %v", err)
}

encodingFactory, ok := encoding.FindEncoding(config.Encoding)
encodingFactory, ok := encoding.FindEncoding(config.Reader.Encoding)
if !ok || encodingFactory == nil {
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Encoding)
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding)
}

prospector := &fileProspector{
Expand All @@ -111,13 +105,8 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
}

filestream := &filestream{
readerConfig: config.readerConfig,
bufferSize: config.BufferSize,
readerConfig: config.Reader,
encodingFactory: encodingFactory,
lineTerminator: config.LineTerminator,
excludeLines: config.ExcludeLines,
includeLines: config.IncludeLines,
maxBytes: config.MaxBytes,
closerConfig: config.Close,
}

Expand Down Expand Up @@ -191,7 +180,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
return nil, err
}

log.Debug("newLogFileReader with config.MaxBytes:", inp.maxBytes)
log.Debug("newLogFileReader with config.MaxBytes:", inp.readerConfig.MaxBytes)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
Expand All @@ -211,22 +200,22 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := inp.maxBytes * 4
encReaderMaxBytes := inp.readerConfig.MaxBytes * 4

var r reader.Reader
r, err = readfile.NewEncodeReader(dbgReader, readfile.Config{
Codec: inp.encoding,
BufferSize: inp.bufferSize,
Terminator: inp.lineTerminator,
BufferSize: inp.readerConfig.BufferSize,
Terminator: inp.readerConfig.LineTerminator,
MaxBytes: encReaderMaxBytes,
})
if err != nil {
f.Close()
return nil, err
}

r = readfile.NewStripNewline(r, inp.lineTerminator)
r = readfile.NewLimitReader(r, inp.maxBytes)
r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator)
r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)

return r, nil
}
Expand Down Expand Up @@ -335,14 +324,14 @@ func (inp *filestream) readFromSource(
// isDroppedLine decides if the line is exported or not based on
// the include_lines and exclude_lines options.
func (inp *filestream) isDroppedLine(log *logp.Logger, line string) bool {
if len(inp.includeLines) > 0 {
if !matchAny(inp.includeLines, line) {
if len(inp.readerConfig.IncludeLines) > 0 {
if !matchAny(inp.readerConfig.IncludeLines, line) {
log.Debug("Drop line as it does not match any of the include patterns %s", line)
return true
}
}
if len(inp.excludeLines) > 0 {
if matchAny(inp.excludeLines, line) {
if len(inp.readerConfig.ExcludeLines) > 0 {
if matchAny(inp.readerConfig.ExcludeLines, line) {
log.Debug("Drop line as it does match one of the exclude patterns%s", line)
return true
}
Expand Down
137 changes: 137 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@
package filestream

import (
"bytes"
"context"
"runtime"
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"
)

// test_close_renamed from test_harvester.py
Expand Down Expand Up @@ -100,3 +106,134 @@ func TestFilestreamCloseEOF(t *testing.T) {

env.requireOffsetInRegistry(testlogName, expectedOffset)
}

// test_empty_lines from test_harvester.py
func TestFilestreamEmptyLine(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("first log line\nnext is an empty line\n")
env.mustWriteLinesToFile(testlogName, testlines)

env.waitUntilEventCount(2)
env.requireOffsetInRegistry(testlogName, len(testlines))

moreTestlines := []byte("\nafter an empty line\n")
env.mustAppendLinesToFile(testlogName, moreTestlines)

env.waitUntilEventCount(3)
env.requireEventsReceived([]string{
"first log line",
"next is an empty line",
"after an empty line",
})

cancelInput()
env.waitUntilInputStops()

env.requireOffsetInRegistry(testlogName, len(testlines)+len(moreTestlines))
}

// test_empty_lines_only from test_harvester.py
// This test differs from the original because in filestream
// input offset is no longer persisted when the line is empty.
func TestFilestreamEmptyLinesOnly(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("\n\n\n")
env.mustWriteLinesToFile(testlogName, testlines)

cancelInput()
env.waitUntilInputStops()

env.requireNoEntryInRegistry(testlogName)
}

// test_bom_utf8 from test_harvester.py
func TestFilestreamBOMUTF8(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

// BOM: 0xEF,0xBB,0xBF
lines := append([]byte{0xEF, 0xBB, 0xBF}, []byte(`#Software: Microsoft Exchange Server
#Version: 14.0.0.0
#Log-type: Message Tracking Log
#Date: 2016-04-05T00:00:02.052Z
#Fields: date-time,client-ip,client-hostname,server-ip,server-hostname,source-context,connector-id,source,event-id,internal-message-id,message-id,recipient-address,recipient-status,total-bytes,recipient-count,related-recipient-address,reference,message-subject,sender-address,return-path,message-info,directionality,tenant-id,original-client-ip,original-server-ip,custom-data
2016-04-05T00:00:02.052Z,,,,,"MDB:61914740-3f1b-4ddb-94e0-557196870cfa, Mailbox:279f077c-216f-4323-a9ee-48e50ffd3cad, Event:269492708, MessageClass:IPM.Note.StorageQuotaWarning.Warning, CreationTime:2016-04-05T00:00:01.022Z, ClientType:System",,STOREDRIVER,NOTIFYMAPI,,,,,,,,,,,,,,,,,S:ItemEntryId=00-00-00-00-37-DB-F9-F9-B5-F2-42-4F-86-62-E6-5D-FC-0C-A1-41-07-00-0E-D6-03-16-80-DC-8C-44-9D-30-07-23-ED-71-B7-F7-00-00-1F-D4-B5-0E-00-00-2E-EF-F2-59-0E-E8-2D-46-BC-31-02-85-0D-67-98-43-00-00-37-4A-A3-B3-00-00
2016-04-05T00:00:02.145Z,,,,,"MDB:61914740-3f1b-4ddb-94e0-557196870cfa, Mailbox:49cb09c6-5b76-415d-a085-da0ad9079682, Event:269492711, MessageClass:IPM.Note.StorageQuotaWarning.Warning, CreationTime:2016-04-05T00:00:01.038Z, ClientType:System",,STOREDRIVER,NOTIFYMAPI,,,,,,,,,,,,,,,,,S:ItemEntryId=00-00-00-00-97-8F-07-43-51-44-61-4A-AD-BD-29-D4-97-4E-20-A0-07-00-0E-D6-03-16-80-DC-8C-44-9D-30-07-23-ED-71-B7-F7-00-8E-8F-BD-EB-57-00-00-3D-FB-CE-26-A4-8D-46-4C-A4-35-0F-A7-9B-FA-D7-B9-00-00-37-44-2F-CA-00-00
`)...)
env.mustWriteLinesToFile(testlogName, lines)

env.waitUntilEventCount(7)

cancelInput()
env.waitUntilInputStops()

messages := env.getOutputMessages()
require.Equal(t, messages[0], "#Software: Microsoft Exchange Server")
}

// test_boms from test_harvester.py
func TestFilestreamUTF16BOMs(t *testing.T) {
encodings := map[string]encoding.Encoding{
"utf-16be-bom": unicode.UTF16(unicode.BigEndian, unicode.UseBOM),
"utf-16le-bom": unicode.UTF16(unicode.LittleEndian, unicode.UseBOM),
}

for name, enc := range encodings {
name := name
encoder := enc.NewEncoder()
t.Run(name, func(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"encoding": name,
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

line := []byte("first line\n")
buf := bytes.NewBuffer(nil)
writer := transform.NewWriter(buf, encoder)
writer.Write(line)
writer.Close()

env.mustWriteLinesToFile(testlogName, buf.Bytes())

env.waitUntilEventCount(1)

env.requireEventsReceived([]string{"first line"})

cancelInput()
env.waitUntilInputStops()
})
}
}