From 526f6184767e7aee71f7a8783d3d2c8d5ffce10d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 31 Mar 2021 11:21:00 +0200 Subject: [PATCH 1/3] Add check to see if metadata changes are detected and saved by the prospector in filestream --- filebeat/input/filestream/environment_test.go | 30 ++++++++++++++++ .../filestream/input_integration_test.go | 36 +++++++++++++++++++ filebeat/input/filestream/prospector.go | 5 ++- 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index a0d424a1bba9..933dd2e3a517 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -34,6 +34,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" @@ -55,6 +56,7 @@ type registryEntry struct { Cursor struct { Offset int `json:"offset"` } `json:"cursor"` + Meta interface{} `json:"meta"` } func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment { @@ -185,6 +187,34 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename string, expec require.Equal(e.t, expectedOffset, entry.Cursor.Offset) } +// requireMetaInRegistry checks if the expected metadata is saved to the registry. +func (e *inputTestingEnvironment) requireMetaInRegistry(filename, expectedSource, expectedIdentifier string) { + filepath := e.abspath(filename) + fi, err := os.Stat(filepath) + if err != nil { + e.t.Fatalf("cannot stat file when cheking for offset: %+v", err) + } + + id := getIDFromPath(filepath, fi) + entry, err := e.getRegistryState(id) + if err != nil { + e.t.Fatalf(err.Error()) + } + + if entry.Meta == nil { + e.t.Fatalf("empty metadata") + } + + var meta fileMeta + err = typeconv.Convert(&meta, entry.Meta) + if err != nil { + e.t.Fatalf("cannot convert: %+v", err) + } + + require.Equal(e.t, expectedSource, meta.Source) + require.Equal(e.t, expectedIdentifier, meta.IdentifierName) +} + func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename string) { filepath := e.abspath(filename) fi, err := os.Stat(filepath) diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index a8dffda1905c..4ef15d55f98f 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -77,6 +77,42 @@ func TestFilestreamCloseRenamed(t *testing.T) { env.requireOffsetInRegistry(testlogName, len(newerTestlines)) } +func TestFilestreamMetadataUpdatedOnRename(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("renaming files while Filebeat is running is not supported on Windows") + } + + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName) + "*"}, + "prospector.scanner.check_interval": "1ms", + }) + + testline := []byte("log line\n") + env.mustWriteLinesToFile(testlogName, testline) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + env.requireOffsetInRegistry(testlogName, len(testline)) + + testlogNameRenamed := "test.log.renamed" + env.mustRenameFile(testlogName, testlogNameRenamed) + + env.mustAppendLinesToFile(testlogNameRenamed, testline) + + env.waitUntilEventCount(2) + env.requireOffsetInRegistry(testlogNameRenamed, len(testline)*2) + + cancelInput() + env.waitUntilInputStops() + + env.requireMetaInRegistry(testlogNameRenamed, env.abspath(testlogNameRenamed), "native") +} + // test_close_removed from test_harvester.py func TestFilestreamCloseRemoved(t *testing.T) { env := newInputTestingEnvironment(t) diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 89932773648c..edcd7f7d24d2 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -169,7 +169,10 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h meta.IdentifierName = p.identifier.Name() } - s.UpdateMetadata(src, fileMeta{Source: src.newPath, IdentifierName: meta.IdentifierName}) + err = s.UpdateMetadata(src, fileMeta{Source: src.newPath, IdentifierName: meta.IdentifierName}) + if err != nil { + log.Errorf("Failed to update cursor meta data of entry %s: %v", src.Name(), err) + } if p.stateChangeCloser.Renamed { log.Debugf("Stopping harvester as file %s has been renamed and close.on_state_change.renamed is enabled.", src.Name()) From 514c37a1d1a470862b2a02231e93afe189101064 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 31 Mar 2021 15:27:12 +0200 Subject: [PATCH 2/3] check immediately after rename if source is modified and cursor stays the same --- filebeat/input/filestream/environment_test.go | 48 +++++++++++-------- .../filestream/input_integration_test.go | 6 ++- 2 files changed, 32 insertions(+), 22 deletions(-) diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 933dd2e3a517..ad15c5fefc79 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -188,31 +188,39 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename string, expec } // requireMetaInRegistry checks if the expected metadata is saved to the registry. -func (e *inputTestingEnvironment) requireMetaInRegistry(filename, expectedSource, expectedIdentifier string) { - filepath := e.abspath(filename) - fi, err := os.Stat(filepath) - if err != nil { - e.t.Fatalf("cannot stat file when cheking for offset: %+v", err) - } +func (e *inputTestingEnvironment) waitUntilMetaInRegistry(filename string, expectedMeta fileMeta) { + for { + filepath := e.abspath(filename) + fi, err := os.Stat(filepath) + if err != nil { + continue + } - id := getIDFromPath(filepath, fi) - entry, err := e.getRegistryState(id) - if err != nil { - e.t.Fatalf(err.Error()) - } + id := getIDFromPath(filepath, fi) + entry, err := e.getRegistryState(id) + if err != nil { + continue + } - if entry.Meta == nil { - e.t.Fatalf("empty metadata") - } + if entry.Meta == nil { + continue + } - var meta fileMeta - err = typeconv.Convert(&meta, entry.Meta) - if err != nil { - e.t.Fatalf("cannot convert: %+v", err) + var meta fileMeta + err = typeconv.Convert(&meta, entry.Meta) + if err != nil { + e.t.Fatalf("cannot convert: %+v", err) + } + + if requireMetadataEquals(expectedMeta, meta) { + break + } + time.Sleep(10 * time.Millisecond) } +} - require.Equal(e.t, expectedSource, meta.Source) - require.Equal(e.t, expectedIdentifier, meta.IdentifierName) +func requireMetadataEquals(one, other fileMeta) bool { + return one == other } func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename string) { diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index 4ef15d55f98f..e82c1496a43b 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -102,6 +102,10 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) { testlogNameRenamed := "test.log.renamed" env.mustRenameFile(testlogName, testlogNameRenamed) + // check if the metadata is updated and cursor data stays the same + env.waitUntilMetaInRegistry(testlogNameRenamed, fileMeta{Source: env.abspath(testlogNameRenamed), IdentifierName: "native"}) + env.requireOffsetInRegistry(testlogNameRenamed, len(testline)) + env.mustAppendLinesToFile(testlogNameRenamed, testline) env.waitUntilEventCount(2) @@ -109,8 +113,6 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) { cancelInput() env.waitUntilInputStops() - - env.requireMetaInRegistry(testlogNameRenamed, env.abspath(testlogNameRenamed), "native") } // test_close_removed from test_harvester.py From 7c2c795c9384b8dd22858cc50c5387966c5fb914 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 31 Mar 2021 15:42:12 +0200 Subject: [PATCH 3/3] add metadata to registry when first encountered --- filebeat/input/filestream/input_integration_test.go | 1 + filebeat/input/filestream/internal/input-logfile/store.go | 2 +- filebeat/input/filestream/prospector.go | 6 ++++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index e82c1496a43b..376970cacf94 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -97,6 +97,7 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) { env.startInput(ctx, inp) env.waitUntilEventCount(1) + env.waitUntilMetaInRegistry(testlogName, fileMeta{Source: env.abspath(testlogName), IdentifierName: "native"}) env.requireOffsetInRegistry(testlogName, len(testline)) testlogNameRenamed := "test.log.renamed" diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index 3bd3fcdb081e..6cd0028c61cc 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -270,7 +270,7 @@ func (s *store) findCursorMeta(key string, to interface{}) error { // updateMetadata updates the cursor metadata in the persistent store. func (s *store) updateMetadata(key string, meta interface{}) error { - resource := s.ephemeralStore.Find(key, false) + resource := s.ephemeralStore.Find(key, true) if resource == nil { return fmt.Errorf("resource '%s' not found", key) } diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index edcd7f7d24d2..f08e4346c74d 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -113,6 +113,12 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h case loginp.OpCreate, loginp.OpWrite: if fe.Op == loginp.OpCreate { log.Debugf("A new file %s has been found", fe.NewPath) + + err := s.UpdateMetadata(src, fileMeta{Source: fe.NewPath, IdentifierName: p.identifier.Name()}) + if err != nil { + log.Errorf("Failed to set cursor meta data of entry %s: %v", src.Name(), err) + } + } else if fe.Op == loginp.OpWrite { log.Debugf("File %s has been updated", fe.NewPath) }