diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 3f651834b6c7..a0d424a1bba9 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -176,9 +176,11 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename string, expec e.t.Fatalf("cannot stat file when cheking for offset: %+v", err) } - identifier, _ := newINodeDeviceIdentifier(nil) - src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: filepath}) - entry := e.getRegistryState(src.Name()) + id := getIDFromPath(filepath, fi) + entry, err := e.getRegistryState(id) + if err != nil { + e.t.Fatalf(err.Error()) + } require.Equal(e.t, expectedOffset, entry.Cursor.Offset) } @@ -204,21 +206,30 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename string) { // requireOffsetInRegistry checks if the expected offset is set for a file. func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expectedOffset int) { - entry := e.getRegistryState(key) + entry, err := e.getRegistryState(key) + if err != nil { + e.t.Fatalf(err.Error()) + } require.Equal(e.t, expectedOffset, entry.Cursor.Offset) } -func (e *inputTestingEnvironment) getRegistryState(key string) registryEntry { +func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, error) { inputStore, _ := e.stateStore.Access() var entry registryEntry err := inputStore.Get(key, &entry) if err != nil { - e.t.Fatalf("error when getting expected key '%s' from store: %+v", key, err) + return registryEntry{}, fmt.Errorf("error when getting expected key '%s' from store: %+v", key, err) } - return entry + return entry, nil +} + +func getIDFromPath(filepath string, fi os.FileInfo) string { + identifier, _ := newINodeDeviceIdentifier(nil) + src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: filepath}) + return "filestream::.global::" + src.Name() } // waitUntilEventCount waits until total count events arrive to the client. diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index 7471ed189047..a8dffda1905c 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -30,8 +30,6 @@ import ( "golang.org/x/text/encoding" "golang.org/x/text/encoding/unicode" "golang.org/x/text/transform" - - loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" ) // test_close_renamed from test_harvester.py @@ -114,9 +112,8 @@ func TestFilestreamCloseRemoved(t *testing.T) { cancelInput() env.waitUntilInputStops() - identifier, _ := newINodeDeviceIdentifier(nil) - src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: env.abspath(testlogName)}) - env.requireOffsetInRegistryByID(src.Name(), len(testlines)) + id := getIDFromPath(env.abspath(testlogName), fi) + env.requireOffsetInRegistryByID(id, len(testlines)) } // test_close_eof from test_harvester.py diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index ae86f8acb2ea..72d0c27e4c8f 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -129,11 +129,12 @@ type defaultHarvesterGroup struct { harvester Harvester cleanTimeout time.Duration store *store + identifier *sourceIdentifier tg unison.TaskGroup } func (hg *defaultHarvesterGroup) Start(ctx input.Context, s Source) { - sourceName := s.Name() + sourceName := hg.identifier.ID(s) ctx.Logger = ctx.Logger.With("source", sourceName) ctx.Logger.Debug("Starting harvester for file") @@ -185,7 +186,7 @@ func (hg *defaultHarvesterGroup) Start(ctx input.Context, s Source) { // Stop stops the running Harvester for a given Source. func (hg *defaultHarvesterGroup) Stop(s Source) { hg.tg.Go(func(_ unison.Canceler) error { - hg.readers.remove(s.Name()) + hg.readers.remove(hg.identifier.ID(s)) return nil }) } diff --git a/filebeat/input/filestream/internal/input-logfile/harvester_test.go b/filebeat/input/filestream/internal/input-logfile/harvester_test.go index dd5e9d78c4c3..9425c30be4b0 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester_test.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester_test.go @@ -101,11 +101,11 @@ func TestDefaultHarvesterGroup(t *testing.T) { source := &testSource{"/path/to/test"} requireSourceAddedToBookkeeper := func(t *testing.T, hg *defaultHarvesterGroup, s Source) { - require.True(t, hg.readers.hasID(s.Name())) + require.True(t, hg.readers.hasID(hg.identifier.ID(s))) } requireSourceRemovedFromBookkeeper := func(t *testing.T, hg *defaultHarvesterGroup, s Source) { - require.False(t, hg.readers.hasID(s.Name())) + require.False(t, hg.readers.hasID(hg.identifier.ID(s))) } t.Run("assert a harvester is started in a goroutine", func(t *testing.T) { @@ -164,7 +164,7 @@ func TestDefaultHarvesterGroup(t *testing.T) { gorountineChecker.WaitUntilIncreased(2) // error is expected as a harvester group was expected to start twice for the same source - for !hg.readers.hasID(source.Name()) { + for !hg.readers.hasID(hg.identifier.ID(source)) { } time.Sleep(3 * time.Millisecond) @@ -222,7 +222,7 @@ func TestDefaultHarvesterGroup(t *testing.T) { hg := testDefaultHarvesterGroup(t, mockHarvester) inputCtx := input.Context{Logger: logp.L(), Cancelation: context.Background()} - r, err := lock(inputCtx, hg.store, source.Name()) + r, err := lock(inputCtx, hg.store, hg.identifier.ID(source)) if err != nil { t.Fatalf("cannot lock source") } @@ -236,7 +236,7 @@ func TestDefaultHarvesterGroup(t *testing.T) { ok := false for !ok { // wait until harvester is added to the bookeeper - ok = hg.readers.hasID(source.Name()) + ok = hg.readers.hasID(hg.identifier.ID(source)) if ok { releaseResource(r) } @@ -258,7 +258,7 @@ func TestDefaultHarvesterGroup(t *testing.T) { gorountineChecker := resources.NewGoroutinesChecker() defer gorountineChecker.WaitUntilOriginalCount() - r, err := lock(inputCtx, hg.store, source.Name()) + r, err := lock(inputCtx, hg.store, hg.identifier.ID(source)) if err != nil { t.Fatalf("cannot lock source") } @@ -275,11 +275,12 @@ func TestDefaultHarvesterGroup(t *testing.T) { func testDefaultHarvesterGroup(t *testing.T, mockHarvester Harvester) *defaultHarvesterGroup { return &defaultHarvesterGroup{ - readers: newReaderGroup(), - pipeline: &pipelinemock.MockPipelineConnector{}, - harvester: mockHarvester, - store: testOpenStore(t, "test", nil), - tg: unison.TaskGroup{}, + readers: newReaderGroup(), + pipeline: &pipelinemock.MockPipelineConnector{}, + harvester: mockHarvester, + store: testOpenStore(t, "test", nil), + identifier: &sourceIdentifier{"filestream::.global::", false}, + tg: unison.TaskGroup{}, } } diff --git a/filebeat/input/filestream/internal/input-logfile/input.go b/filebeat/input/filestream/internal/input-logfile/input.go index 4939ce26c4e2..dfc9a9817a02 100644 --- a/filebeat/input/filestream/internal/input-logfile/input.go +++ b/filebeat/input/filestream/internal/input-logfile/input.go @@ -67,6 +67,7 @@ func (inp *managedInput) Run( cleanTimeout: inp.cleanTimeout, harvester: inp.harvester, store: groupStore, + identifier: inp.sourceIdentifier, tg: unison.TaskGroup{}, }