diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index f1a69f03994b..bd2719c9ba97 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -22,6 +22,7 @@ package filestream import ( "bytes" "context" + "os" "runtime" "testing" @@ -67,7 +68,6 @@ func TestFilestreamCloseRenamed(t *testing.T) { newerTestlines := []byte("new first log line\nnew second log line\n") env.mustWriteLinesToFile(testlogName, newerTestlines) - // new two events arrived env.waitUntilEventCount(3) cancelInput() @@ -116,6 +116,45 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) { env.waitUntilInputStops() } +// test_close_removed from test_harvester.py +func TestFilestreamCloseRemoved(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName) + "*"}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "1ms", + "close.on_state_change.removed": "true", + }) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + testlines := []byte("first log line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + // first event has made it successfully + env.waitUntilEventCount(1) + + env.requireOffsetInRegistry(testlogName, len(testlines)) + + fi, err := os.Stat(env.abspath(testlogName)) + if err != nil { + t.Fatalf("cannot stat file: %+v", err) + } + + env.mustRemoveFile(testlogName) + + env.waitUntilHarvesterIsDone() + + cancelInput() + env.waitUntilInputStops() + + id := getIDFromPath(env.abspath(testlogName), fi) + env.requireOffsetInRegistryByID(id, len(testlines)) +} + // test_close_eof from test_harvester.py func TestFilestreamCloseEOF(t *testing.T) { env := newInputTestingEnvironment(t) @@ -127,13 +166,13 @@ func TestFilestreamCloseEOF(t *testing.T) { "close.reader.on_eof": "true", }) - ctx, cancelInput := context.WithCancel(context.Background()) - env.startInput(ctx, inp) - testlines := []byte("first log line\n") expectedOffset := len(testlines) env.mustWriteLinesToFile(testlogName, testlines) + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + // first event has made it successfully env.waitUntilEventCount(1) env.requireOffsetInRegistry(testlogName, expectedOffset) diff --git a/libbeat/reader/readfile/line_test.go b/libbeat/reader/readfile/line_test.go index d91544162c5a..df28f3345b9a 100644 --- a/libbeat/reader/readfile/line_test.go +++ b/libbeat/reader/readfile/line_test.go @@ -30,6 +30,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/text/transform" "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" @@ -390,3 +391,37 @@ func TestMaxBytesLimit(t *testing.T) { } } } + +// test_exceed_buffer from test_harvester.py +func TestBufferSize(t *testing.T) { + lines := []string{ + "first line is too long\n", + "second line is too long\n", + "third line too long\n", + "OK\n", + } + + codecFactory, _ := encoding.FindEncoding("") + codec, _ := codecFactory(bytes.NewBuffer(nil)) + bufferSize := 10 + + in := ioutil.NopCloser(strings.NewReader(strings.Join(lines, ""))) + reader, err := NewLineReader(in, Config{codec, bufferSize, AutoLineTerminator, 1024}) + if err != nil { + t.Fatal("failed to initialize reader:", err) + } + + for i := 0; i < len(lines); i++ { + b, n, err := reader.Next() + if err != nil { + if err == io.EOF { + break + } else { + t.Fatal("unexpected error:", err) + } + } + + require.Equal(t, n, len(lines[i])) + require.Equal(t, string(b[:n]), lines[i]) + } +}