diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index a0d424a1bba9..ad15c5fefc79 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -34,6 +34,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" @@ -55,6 +56,7 @@ type registryEntry struct { Cursor struct { Offset int `json:"offset"` } `json:"cursor"` + Meta interface{} `json:"meta"` } func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment { @@ -185,6 +187,42 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename string, expec require.Equal(e.t, expectedOffset, entry.Cursor.Offset) } +// requireMetaInRegistry checks if the expected metadata is saved to the registry. +func (e *inputTestingEnvironment) waitUntilMetaInRegistry(filename string, expectedMeta fileMeta) { + for { + filepath := e.abspath(filename) + fi, err := os.Stat(filepath) + if err != nil { + continue + } + + id := getIDFromPath(filepath, fi) + entry, err := e.getRegistryState(id) + if err != nil { + continue + } + + if entry.Meta == nil { + continue + } + + var meta fileMeta + err = typeconv.Convert(&meta, entry.Meta) + if err != nil { + e.t.Fatalf("cannot convert: %+v", err) + } + + if requireMetadataEquals(expectedMeta, meta) { + break + } + time.Sleep(10 * time.Millisecond) + } +} + +func requireMetadataEquals(one, other fileMeta) bool { + return one == other +} + func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename string) { filepath := e.abspath(filename) fi, err := os.Stat(filepath) diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index a8dffda1905c..376970cacf94 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -77,6 +77,45 @@ func TestFilestreamCloseRenamed(t *testing.T) { env.requireOffsetInRegistry(testlogName, len(newerTestlines)) } +func TestFilestreamMetadataUpdatedOnRename(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("renaming files while Filebeat is running is not supported on Windows") + } + + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName) + "*"}, + "prospector.scanner.check_interval": "1ms", + }) + + testline := []byte("log line\n") + env.mustWriteLinesToFile(testlogName, testline) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + env.waitUntilMetaInRegistry(testlogName, fileMeta{Source: env.abspath(testlogName), IdentifierName: "native"}) + env.requireOffsetInRegistry(testlogName, len(testline)) + + testlogNameRenamed := "test.log.renamed" + env.mustRenameFile(testlogName, testlogNameRenamed) + + // check if the metadata is updated and cursor data stays the same + env.waitUntilMetaInRegistry(testlogNameRenamed, fileMeta{Source: env.abspath(testlogNameRenamed), IdentifierName: "native"}) + env.requireOffsetInRegistry(testlogNameRenamed, len(testline)) + + env.mustAppendLinesToFile(testlogNameRenamed, testline) + + env.waitUntilEventCount(2) + env.requireOffsetInRegistry(testlogNameRenamed, len(testline)*2) + + cancelInput() + env.waitUntilInputStops() +} + // test_close_removed from test_harvester.py func TestFilestreamCloseRemoved(t *testing.T) { env := newInputTestingEnvironment(t) diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index 3bd3fcdb081e..6cd0028c61cc 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -270,7 +270,7 @@ func (s *store) findCursorMeta(key string, to interface{}) error { // updateMetadata updates the cursor metadata in the persistent store. func (s *store) updateMetadata(key string, meta interface{}) error { - resource := s.ephemeralStore.Find(key, false) + resource := s.ephemeralStore.Find(key, true) if resource == nil { return fmt.Errorf("resource '%s' not found", key) } diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 89932773648c..f08e4346c74d 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -113,6 +113,12 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h case loginp.OpCreate, loginp.OpWrite: if fe.Op == loginp.OpCreate { log.Debugf("A new file %s has been found", fe.NewPath) + + err := s.UpdateMetadata(src, fileMeta{Source: fe.NewPath, IdentifierName: p.identifier.Name()}) + if err != nil { + log.Errorf("Failed to set cursor meta data of entry %s: %v", src.Name(), err) + } + } else if fe.Op == loginp.OpWrite { log.Debugf("File %s has been updated", fe.NewPath) } @@ -169,7 +175,10 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h meta.IdentifierName = p.identifier.Name() } - s.UpdateMetadata(src, fileMeta{Source: src.newPath, IdentifierName: meta.IdentifierName}) + err = s.UpdateMetadata(src, fileMeta{Source: src.newPath, IdentifierName: meta.IdentifierName}) + if err != nil { + log.Errorf("Failed to update cursor meta data of entry %s: %v", src.Name(), err) + } if p.stateChangeCloser.Renamed { log.Debugf("Stopping harvester as file %s has been renamed and close.on_state_change.renamed is enabled.", src.Name())