From eabe0f8c233e8766f4177b04c6ad7cdb9e69c55d Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 21 Nov 2024 13:06:34 -0500 Subject: [PATCH 1/3] [Filebeat/Filestream] Fix `sourceStore.UpdateIdentifiers` The `sourceStore.UpdateIdentifiers` has always been part of the fileProspector.Init, its purpose is to update the identifiers in the registry if the file identity has changed, however it was generating the wrong key and not updating the in memory registry (store.ephemeralStore). This commit fixes it and also removes `sourceStore.FixUpIdentifiers` because it just a working version of `sourceStore.UpdateIdentifiers`. Now there is a single method to manipulate identifiers in the `sourceStore`. --- .../config/filebeat.global.reference.yml.tmpl | 2 + .../internal/input-logfile/prospector.go | 4 -- .../internal/input-logfile/store.go | 54 +++++-------------- filebeat/input/filestream/prospector.go | 7 ++- 4 files changed, 19 insertions(+), 48 deletions(-) diff --git a/filebeat/_meta/config/filebeat.global.reference.yml.tmpl b/filebeat/_meta/config/filebeat.global.reference.yml.tmpl index 0287fb3f9f5..9d0a3c23974 100644 --- a/filebeat/_meta/config/filebeat.global.reference.yml.tmpl +++ b/filebeat/_meta/config/filebeat.global.reference.yml.tmpl @@ -15,6 +15,8 @@ # batch of events has been published successfully. The default value is 1s. #filebeat.registry.flush: 1s +# The interval which to run the registry clean up +#filebeat.registry.cleanup_interval: 5m # Starting with Filebeat 7.0, the registry uses a new directory format to store # Filebeat state. After you upgrade, Filebeat will automatically migrate a 6.x diff --git a/filebeat/input/filestream/internal/input-logfile/prospector.go b/filebeat/input/filestream/internal/input-logfile/prospector.go index 733e55fe26e..4fc383f315d 100644 --- a/filebeat/input/filestream/internal/input-logfile/prospector.go +++ b/filebeat/input/filestream/internal/input-logfile/prospector.go @@ -56,10 +56,6 @@ type ProspectorCleaner interface { // The function passed to UpdateIdentifiers must return an empty string if the key // remains the same. UpdateIdentifiers(func(v Value) (string, interface{})) - - // FixUpIdentifiers migrates IDs in the registry from inputs - // that used the deprecated `.global` ID. - FixUpIdentifiers(func(v Value) (string, interface{})) } // Value contains the cursor metadata. diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index 024ca5c9bfd..726f4e0c593 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -212,13 +212,21 @@ func (s *sourceStore) CleanIf(pred func(v Value) bool) { } } -// FixUpIdentifiers copies an existing resource to a new ID and marks the previous one +// UpdateIdentifiers copies an existing resource to a new ID and marks the previous one // for removal. -func (s *sourceStore) FixUpIdentifiers(getNewID func(v Value) (string, interface{})) { +func (s *sourceStore) UpdateIdentifiers(getNewID func(v Value) (string, interface{})) { s.store.ephemeralStore.mu.Lock() defer s.store.ephemeralStore.mu.Unlock() for key, res := range s.store.ephemeralStore.table { + // - res.internalState.TTL == 0 is a deleted entry + // - res.internalState.TTL > 0 is an entry that will be removed once the TTL + // is reached + // - res.internalState.TTL == -1 is an entry that will never be removed + if res.internalState.TTL == 0 { + continue + } + if !s.identifier.MatchesInput(key) { continue } @@ -229,7 +237,7 @@ func (s *sourceStore) FixUpIdentifiers(getNewID func(v Value) (string, interface } newKey, updatedMeta := getNewID(res) - if len(newKey) > 0 && res.internalState.TTL > 0 { + if len(newKey) > 0 { if _, ok := s.store.ephemeralStore.table[newKey]; ok { res.lock.Unlock() continue @@ -249,48 +257,10 @@ func (s *sourceStore) FixUpIdentifiers(getNewID func(v Value) (string, interface // Add the new resource to the ephemeralStore so the rest of the // codebase can have access to the new value s.store.ephemeralStore.table[newKey] = r - // Remove the old key from the store s.store.UpdateTTL(res, 0) // aka delete. See store.remove for details s.store.log.Infof("migrated entry in registry from '%s' to '%s'", key, newKey) - } - - res.lock.Unlock() - } -} - -// UpdateIdentifiers copies an existing resource to a new ID and marks the previous one -// for removal. -func (s *sourceStore) UpdateIdentifiers(getNewID func(v Value) (string, interface{})) { - s.store.ephemeralStore.mu.Lock() - defer s.store.ephemeralStore.mu.Unlock() - - for key, res := range s.store.ephemeralStore.table { - if !s.identifier.MatchesInput(key) { - continue - } - - if !res.lock.TryLock() { - continue - } - - newKey, updatedMeta := getNewID(res) - if len(newKey) > 0 && res.internalState.TTL > 0 { - if _, ok := s.store.ephemeralStore.table[newKey]; ok { - res.lock.Unlock() - continue - } - - // Pending updates due to events that have not yet been ACKed - // are not included in the copy. Collection on - // the copy start from the last known ACKed position. - // This might lead to data duplication because the harvester - // will pickup from the last ACKed position using the new key - // and the pending updates will affect the entry with the oldKey. - r := res.copyWithNewKey(newKey) - r.cursorMeta = updatedMeta - r.stored = false - s.store.writeState(r) + s.store.log.Infof("migrated entry in registry from '%s' to '%s'. Cursor: %v", key, newKey, r.cursor) } res.lock.Unlock() diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 2bf737a86fd..5c7402f64dc 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -70,7 +70,7 @@ func (p *fileProspector) Init( // If this fileProspector belongs to an input that did not have an ID // this will find its files in the registry and update them to use the // new ID. - globalCleaner.FixUpIdentifiers(func(v loginp.Value) (id string, val interface{}) { + globalCleaner.UpdateIdentifiers(func(v loginp.Value) (id string, val interface{}) { var fm fileMeta err := v.UnpackCursorMeta(&fm) if err != nil { @@ -101,6 +101,9 @@ func (p *fileProspector) Init( } identifierName := p.identifier.Name() + + // If the file identity has changed, update the registry keys so we can keep + // the state. cleaner.UpdateIdentifiers(func(v loginp.Value) (string, interface{}) { var fm fileMeta err := v.UnpackCursorMeta(&fm) @@ -114,7 +117,7 @@ func (p *fileProspector) Init( } if fm.IdentifierName != identifierName { - newKey := p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd}).Name() + newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd})) fm.IdentifierName = identifierName return newKey, fm } From a2798fe313db19d5dfc5052d29c59ec00d07f3ea Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 21 Nov 2024 13:59:45 -0500 Subject: [PATCH 2/3] Fix tests --- .../internal/input-logfile/store_test.go | 23 +++++++++++-------- filebeat/input/filestream/prospector_test.go | 6 ++++- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/filebeat/input/filestream/internal/input-logfile/store_test.go b/filebeat/input/filestream/internal/input-logfile/store_test.go index 6f19e1afad7..2d4f98b5d29 100644 --- a/filebeat/input/filestream/internal/input-logfile/store_test.go +++ b/filebeat/input/filestream/internal/input-logfile/store_test.go @@ -347,11 +347,11 @@ type testMeta struct { func TestSourceStore_UpdateIdentifiers(t *testing.T) { t.Run("update identifiers when TTL is bigger than zero", func(t *testing.T) { backend := createSampleStore(t, map[string]state{ - "test::key1": { + "test::key1": { // Active resource TTL: 60 * time.Second, Meta: testMeta{IdentifierName: "method"}, }, - "test::key2": { + "test::key2": { // Deleted resource TTL: 0 * time.Second, Meta: testMeta{IdentifierName: "method"}, }, @@ -372,22 +372,25 @@ func TestSourceStore_UpdateIdentifiers(t *testing.T) { return "", nil }) - var newState state - s.persistentStore.Get("test::key1::updated", &newState) + // The persistentStore is a mock that does not consider if a state has + // been removed before returning it, thus allowing us to get Updated + // timestamp from when the resource was deleted. + var deletedState state + s.persistentStore.Get("test::key1", &deletedState) want := map[string]state{ - "test::key1": { - Updated: s.Get("test::key1").internalState.Updated, - TTL: 60 * time.Second, + "test::key1": { // old resource is deleted, TTL must be zero + Updated: deletedState.Updated, + TTL: 0 * time.Second, Meta: map[string]interface{}{"identifiername": "method"}, }, - "test::key2": { + "test::key2": { // Unchanged Updated: s.Get("test::key2").internalState.Updated, TTL: 0 * time.Second, Meta: map[string]interface{}{"identifiername": "method"}, }, - "test::key1::updated": { - Updated: newState.Updated, + "test::key1::updated": { // Updated resource + Updated: s.Get("test::key1::updated").internalState.Updated, TTL: 60 * time.Second, Meta: map[string]interface{}{"identifiername": "something"}, }, diff --git a/filebeat/input/filestream/prospector_test.go b/filebeat/input/filestream/prospector_test.go index 552b4218c78..36a6b2fb176 100644 --- a/filebeat/input/filestream/prospector_test.go +++ b/filebeat/input/filestream/prospector_test.go @@ -112,11 +112,13 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) { entries map[string]loginp.Value filesOnDisk map[string]loginp.FileDescriptor expectedUpdatedKeys map[string]string + newKey string }{ "prospector init does not update keys if there are no entries": { entries: nil, filesOnDisk: nil, expectedUpdatedKeys: map[string]string{}, + newKey: "foo", // it isn't used but it must not be empty }, "prospector init does not update keys of not existing files": { entries: map[string]loginp.Value{ @@ -129,6 +131,7 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) { }, filesOnDisk: nil, expectedUpdatedKeys: map[string]string{}, + newKey: "foo", // it isn't used but it must not be empty }, "prospector init updates keys of existing files": { entries: map[string]loginp.Value{ @@ -143,6 +146,7 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) { tmpFileName: {Info: file.ExtendFileInfo(fi)}, }, expectedUpdatedKeys: map[string]string{"not_path::key1": "path::" + tmpFileName}, + newKey: "path::" + tmpFileName, }, } @@ -155,7 +159,7 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) { identifier: mustPathIdentifier(false), filewatcher: newMockFileWatcherWithFiles(testCase.filesOnDisk), } - p.Init(testStore, newMockProspectorCleaner(nil), func(loginp.Source) string { return "" }) + p.Init(testStore, newMockProspectorCleaner(nil), func(loginp.Source) string { return testCase.newKey }) assert.EqualValues(t, testCase.expectedUpdatedKeys, testStore.updatedKeys) }) From a4ff07a5668feebd3cd687576b73afa7edeb45b7 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 22 Nov 2024 18:16:38 -0500 Subject: [PATCH 3/3] Check if source matches the real file This commit checks if 'source' matches the real file by calculating the registry key using the old identifier, if they match, then update the registry. --- .../internal/input-logfile/prospector.go | 3 + .../internal/input-logfile/store.go | 27 +++++--- filebeat/input/filestream/prospector.go | 69 +++++++++++++++++-- .../input/filestream/prospector_creator.go | 6 +- 4 files changed, 86 insertions(+), 19 deletions(-) diff --git a/filebeat/input/filestream/internal/input-logfile/prospector.go b/filebeat/input/filestream/internal/input-logfile/prospector.go index 4fc383f315d..2f90d440e36 100644 --- a/filebeat/input/filestream/internal/input-logfile/prospector.go +++ b/filebeat/input/filestream/internal/input-logfile/prospector.go @@ -62,4 +62,7 @@ type ProspectorCleaner interface { type Value interface { // UnpackCursorMeta returns the cursor metadata required by the prospector. UnpackCursorMeta(to interface{}) error + + // Key return the registry's key for this resource + Key() string } diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index 726f4e0c593..c0031af35cb 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -219,10 +219,17 @@ func (s *sourceStore) UpdateIdentifiers(getNewID func(v Value) (string, interfac defer s.store.ephemeralStore.mu.Unlock() for key, res := range s.store.ephemeralStore.table { - // - res.internalState.TTL == 0 is a deleted entry - // - res.internalState.TTL > 0 is an entry that will be removed once the TTL + // Entries in the registry are soft deleted, once the gcStore runs, + // they're actually removed from the in-memory registry (ephemeralStore) + // and marked as removed in the registry operations log. So we need + // to skip all entries that were soft deleted. + // + // - res.internalState.TTL == 0: entry has been deleted + // - res.internalState.TTL == -1: entry will never be removed by TTL + // - res.internalState.TTL > 0: entry will be removed once its TTL // is reached - // - res.internalState.TTL == -1 is an entry that will never be removed + // + // If the entry has been deleted, skip it if res.internalState.TTL == 0 { continue } @@ -243,23 +250,17 @@ func (s *sourceStore) UpdateIdentifiers(getNewID func(v Value) (string, interfac continue } - // Pending updates due to events that have not yet been ACKed - // are not included in the copy. Collection on - // the copy start from the last known ACKed position. - // This might lead to data duplication because the harvester - // will pickup from the last ACKed position using the new key - // and the pending updates will affect the entry with the oldKey. r := res.copyWithNewKey(newKey) r.cursorMeta = updatedMeta r.stored = false - s.store.writeState(r) + s.store.writeState(r) // writeState only writes to the log file // Add the new resource to the ephemeralStore so the rest of the // codebase can have access to the new value s.store.ephemeralStore.table[newKey] = r + // Remove the old key from the store s.store.UpdateTTL(res, 0) // aka delete. See store.remove for details - s.store.log.Infof("migrated entry in registry from '%s' to '%s'", key, newKey) s.store.log.Infof("migrated entry in registry from '%s' to '%s'. Cursor: %v", key, newKey, r.cursor) } @@ -456,6 +457,10 @@ func (r *resource) UnpackCursorMeta(to interface{}) error { return typeconv.Convert(to, r.cursorMeta) } +func (r *resource) Key() string { + return r.key +} + // syncStateSnapshot returns the current insync state based on already ACKed update operations. func (r *resource) inSyncStateSnapshot() state { return state{ diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 5c7402f64dc..2ca1a415959 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -47,11 +47,29 @@ var ignoreInactiveSettings = map[string]ignoreInactiveType{ ignoreInactiveSinceFirstStartStr: IgnoreInactiveSinceFirstStart, } +var identifiersMap = map[string]fileIdentifier{} + +func init() { + // Initialise a default identifier + for name, factory := range identifierFactories { + if name == inodeMarkerName { + // inode marker requries an specific config we cannot infer. + continue + } + var err error + identifiersMap[name], err = factory(nil) + if err != nil { + panic(fmt.Errorf("cannot create identifier '%s': %w", name, err)) + } + } +} + // fileProspector implements the Prospector interface. // It contains a file scanner which returns file system events. // The FS events then trigger either new Harvester runs or updates // the statestore. type fileProspector struct { + logger *logp.Logger filewatcher loginp.FSWatcher identifier fileIdentifier ignoreOlder time.Duration @@ -116,12 +134,53 @@ func (p *fileProspector) Init( return "", fm } - if fm.IdentifierName != identifierName { - newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd})) - fm.IdentifierName = identifierName - return newKey, fm + // Return early if: + // - The identifiers are the same + // - The old identifier is fingerprint + // - The old identifier is inode marker + oldIdentifierName := fm.IdentifierName + if oldIdentifierName == identifierName || oldIdentifierName == fingerprintName { + return "", nil } - return "", fm + + // Our current file (source) is in the registry, now we need to ensure + // this registry entry (resource) actually refers to our file. Sources + // are identified by path, however as log files rotate the same path + // can point to different files. + // + // So to ensure we're dealing with the resource from our current file, + // we use the old identifier to generate a registry key for the current + // file we're trying to migrate, if this key matches with the key in the + // registry, then we proceed to update the registry. + registryKey := v.Key() + oldIdentifier, ok := identifiersMap[oldIdentifierName] + if !ok { + // This should never happen, but just in case we properly handle it + // If we cannot find the identifier, move on to the next entry + // some identifiers cannot be migrated + p.logger.Errorf("cannot migrate registry entry from '%s', if the file still exists, it will be re-ingested", oldIdentifierName) + return "", nil + } + // TODO: fix case when old identifier is fingerprint, or just do not handle it + previousIdentifierKey := newID(oldIdentifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd})) + + // If the registry key and the key generated by the old identifier + // do not match, log it at debug level and do nothing. + if previousIdentifierKey != registryKey { + p.logger.Debugf("registry key: '%s' and previous file identity key: '%s', differ, will not migrate. Source: '%s'", + registryKey, previousIdentifierKey, fm.Source) + // fmt.Printf("registry key: '%s' and previous file identity key: '%s', differ, will not migrate\n", registryKey, previousIdentifierKey) + return "", fm + } + + // The resource matches the file we found in the file system, generate + // a new registry key and return it alongside the updated meta. + newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd})) + fm.IdentifierName = identifierName + p.logger.Debugf("registry key: '%s' and previous file identity key: '%s', are the same migrating. Source: '%s'", + registryKey, previousIdentifierKey, fm.Source) + + return newKey, fm }) return nil diff --git a/filebeat/input/filestream/prospector_creator.go b/filebeat/input/filestream/prospector_creator.go index 5142704a614..91a5e0b30d3 100644 --- a/filebeat/input/filestream/prospector_creator.go +++ b/filebeat/input/filestream/prospector_creator.go @@ -53,9 +53,8 @@ func newProspector(config config) (loginp.Prospector, error) { return nil, fmt.Errorf("error while creating file identifier: %w", err) } - logp.L(). - With("filestream_id", config.ID). - Debugf("file identity is set to %s", identifier.Name()) + logger := logp.L().Named("input.filestream").With("filestream_id", config.ID) + logger.Debugf("file identity is set to %s", identifier.Name()) fileprospector := fileProspector{ filewatcher: filewatcher, @@ -64,6 +63,7 @@ func newProspector(config config) (loginp.Prospector, error) { ignoreInactiveSince: config.IgnoreInactive, cleanRemoved: config.CleanRemoved, stateChangeCloser: config.Close.OnStateChange, + logger: logger.Named("prospector"), } if config.Rotation == nil { return &fileprospector, nil