Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

40197 filestream migrate file identity #41762

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions filebeat/_meta/config/filebeat.global.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# batch of events has been published successfully. The default value is 1s.
#filebeat.registry.flush: 1s

# The interval which to run the registry clean up
#filebeat.registry.cleanup_interval: 5m

# Starting with Filebeat 7.0, the registry uses a new directory format to store
# Filebeat state. After you upgrade, Filebeat will automatically migrate a 6.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,13 @@ type ProspectorCleaner interface {
// The function passed to UpdateIdentifiers must return an empty string if the key
// remains the same.
UpdateIdentifiers(func(v Value) (string, interface{}))

// FixUpIdentifiers migrates IDs in the registry from inputs
// that used the deprecated `.global` ID.
FixUpIdentifiers(func(v Value) (string, interface{}))
}

// Value contains the cursor metadata.
type Value interface {
// UnpackCursorMeta returns the cursor metadata required by the prospector.
UnpackCursorMeta(to interface{}) error

// Key return the registry's key for this resource
Key() string
}
73 changes: 24 additions & 49 deletions filebeat/input/filestream/internal/input-logfile/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,28 @@ func (s *sourceStore) CleanIf(pred func(v Value) bool) {
}
}

// FixUpIdentifiers copies an existing resource to a new ID and marks the previous one
// UpdateIdentifiers copies an existing resource to a new ID and marks the previous one
// for removal.
func (s *sourceStore) FixUpIdentifiers(getNewID func(v Value) (string, interface{})) {
func (s *sourceStore) UpdateIdentifiers(getNewID func(v Value) (string, interface{})) {
s.store.ephemeralStore.mu.Lock()
defer s.store.ephemeralStore.mu.Unlock()

for key, res := range s.store.ephemeralStore.table {
// Entries in the registry are soft deleted, once the gcStore runs,
// they're actually removed from the in-memory registry (ephemeralStore)
// and marked as removed in the registry operations log. So we need
// to skip all entries that were soft deleted.
//
// - res.internalState.TTL == 0: entry has been deleted
// - res.internalState.TTL == -1: entry will never be removed by TTL
// - res.internalState.TTL > 0: entry will be removed once its TTL
// is reached
//
// If the entry has been deleted, skip it
if res.internalState.TTL == 0 {
continue
}

if !s.identifier.MatchesInput(key) {
continue
}
Expand All @@ -229,68 +244,24 @@ func (s *sourceStore) FixUpIdentifiers(getNewID func(v Value) (string, interface
}

newKey, updatedMeta := getNewID(res)
if len(newKey) > 0 && res.internalState.TTL > 0 {
if len(newKey) > 0 {
if _, ok := s.store.ephemeralStore.table[newKey]; ok {
res.lock.Unlock()
continue
}

// Pending updates due to events that have not yet been ACKed
// are not included in the copy. Collection on
// the copy start from the last known ACKed position.
// This might lead to data duplication because the harvester
// will pickup from the last ACKed position using the new key
// and the pending updates will affect the entry with the oldKey.
r := res.copyWithNewKey(newKey)
r.cursorMeta = updatedMeta
r.stored = false
s.store.writeState(r)
s.store.writeState(r) // writeState only writes to the log file

// Add the new resource to the ephemeralStore so the rest of the
// codebase can have access to the new value
s.store.ephemeralStore.table[newKey] = r

// Remove the old key from the store
s.store.UpdateTTL(res, 0) // aka delete. See store.remove for details
s.store.log.Infof("migrated entry in registry from '%s' to '%s'", key, newKey)
}

res.lock.Unlock()
}
}

// UpdateIdentifiers copies an existing resource to a new ID and marks the previous one
// for removal.
func (s *sourceStore) UpdateIdentifiers(getNewID func(v Value) (string, interface{})) {
s.store.ephemeralStore.mu.Lock()
defer s.store.ephemeralStore.mu.Unlock()

for key, res := range s.store.ephemeralStore.table {
if !s.identifier.MatchesInput(key) {
continue
}

if !res.lock.TryLock() {
continue
}

newKey, updatedMeta := getNewID(res)
if len(newKey) > 0 && res.internalState.TTL > 0 {
if _, ok := s.store.ephemeralStore.table[newKey]; ok {
res.lock.Unlock()
continue
}

// Pending updates due to events that have not yet been ACKed
// are not included in the copy. Collection on
// the copy start from the last known ACKed position.
// This might lead to data duplication because the harvester
// will pickup from the last ACKed position using the new key
// and the pending updates will affect the entry with the oldKey.
r := res.copyWithNewKey(newKey)
r.cursorMeta = updatedMeta
r.stored = false
s.store.writeState(r)
s.store.log.Infof("migrated entry in registry from '%s' to '%s'. Cursor: %v", key, newKey, r.cursor)
}

res.lock.Unlock()
Expand Down Expand Up @@ -486,6 +457,10 @@ func (r *resource) UnpackCursorMeta(to interface{}) error {
return typeconv.Convert(to, r.cursorMeta)
}

func (r *resource) Key() string {
return r.key
}

// syncStateSnapshot returns the current insync state based on already ACKed update operations.
func (r *resource) inSyncStateSnapshot() state {
return state{
Expand Down
23 changes: 13 additions & 10 deletions filebeat/input/filestream/internal/input-logfile/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,11 @@ type testMeta struct {
func TestSourceStore_UpdateIdentifiers(t *testing.T) {
t.Run("update identifiers when TTL is bigger than zero", func(t *testing.T) {
backend := createSampleStore(t, map[string]state{
"test::key1": {
"test::key1": { // Active resource
TTL: 60 * time.Second,
Meta: testMeta{IdentifierName: "method"},
},
"test::key2": {
"test::key2": { // Deleted resource
TTL: 0 * time.Second,
Meta: testMeta{IdentifierName: "method"},
},
Expand All @@ -372,22 +372,25 @@ func TestSourceStore_UpdateIdentifiers(t *testing.T) {
return "", nil
})

var newState state
s.persistentStore.Get("test::key1::updated", &newState)
// The persistentStore is a mock that does not consider if a state has
// been removed before returning it, thus allowing us to get Updated
// timestamp from when the resource was deleted.
var deletedState state
s.persistentStore.Get("test::key1", &deletedState)

want := map[string]state{
"test::key1": {
Updated: s.Get("test::key1").internalState.Updated,
TTL: 60 * time.Second,
"test::key1": { // old resource is deleted, TTL must be zero
Updated: deletedState.Updated,
TTL: 0 * time.Second,
Meta: map[string]interface{}{"identifiername": "method"},
},
"test::key2": {
"test::key2": { // Unchanged
Updated: s.Get("test::key2").internalState.Updated,
TTL: 0 * time.Second,
Meta: map[string]interface{}{"identifiername": "method"},
},
"test::key1::updated": {
Updated: newState.Updated,
"test::key1::updated": { // Updated resource
Updated: s.Get("test::key1::updated").internalState.Updated,
TTL: 60 * time.Second,
Meta: map[string]interface{}{"identifiername": "something"},
},
Expand Down
74 changes: 68 additions & 6 deletions filebeat/input/filestream/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,29 @@ var ignoreInactiveSettings = map[string]ignoreInactiveType{
ignoreInactiveSinceFirstStartStr: IgnoreInactiveSinceFirstStart,
}

var identifiersMap = map[string]fileIdentifier{}

func init() {
// Initialise a default identifier
for name, factory := range identifierFactories {
if name == inodeMarkerName {
// inode marker requries an specific config we cannot infer.
continue
}
var err error
identifiersMap[name], err = factory(nil)
if err != nil {
panic(fmt.Errorf("cannot create identifier '%s': %w", name, err))
}
}
}

// fileProspector implements the Prospector interface.
// It contains a file scanner which returns file system events.
// The FS events then trigger either new Harvester runs or updates
// the statestore.
type fileProspector struct {
logger *logp.Logger
filewatcher loginp.FSWatcher
identifier fileIdentifier
ignoreOlder time.Duration
Expand All @@ -70,7 +88,7 @@ func (p *fileProspector) Init(
// If this fileProspector belongs to an input that did not have an ID
// this will find its files in the registry and update them to use the
// new ID.
globalCleaner.FixUpIdentifiers(func(v loginp.Value) (id string, val interface{}) {
globalCleaner.UpdateIdentifiers(func(v loginp.Value) (id string, val interface{}) {
var fm fileMeta
err := v.UnpackCursorMeta(&fm)
if err != nil {
Expand Down Expand Up @@ -101,6 +119,9 @@ func (p *fileProspector) Init(
}

identifierName := p.identifier.Name()

// If the file identity has changed, update the registry keys so we can keep
// the state.
cleaner.UpdateIdentifiers(func(v loginp.Value) (string, interface{}) {
var fm fileMeta
err := v.UnpackCursorMeta(&fm)
Expand All @@ -113,12 +134,53 @@ func (p *fileProspector) Init(
return "", fm
}

if fm.IdentifierName != identifierName {
newKey := p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd}).Name()
fm.IdentifierName = identifierName
return newKey, fm
// Return early if:
// - The identifiers are the same
// - The old identifier is fingerprint
// - The old identifier is inode marker
oldIdentifierName := fm.IdentifierName
if oldIdentifierName == identifierName || oldIdentifierName == fingerprintName {
return "", nil
}

// Our current file (source) is in the registry, now we need to ensure
// this registry entry (resource) actually refers to our file. Sources
// are identified by path, however as log files rotate the same path
// can point to different files.
//
// So to ensure we're dealing with the resource from our current file,
// we use the old identifier to generate a registry key for the current
// file we're trying to migrate, if this key matches with the key in the
// registry, then we proceed to update the registry.
registryKey := v.Key()
oldIdentifier, ok := identifiersMap[oldIdentifierName]
if !ok {
// This should never happen, but just in case we properly handle it
// If we cannot find the identifier, move on to the next entry
// some identifiers cannot be migrated
p.logger.Errorf("cannot migrate registry entry from '%s', if the file still exists, it will be re-ingested", oldIdentifierName)
return "", nil
}
// TODO: fix case when old identifier is fingerprint, or just do not handle it
previousIdentifierKey := newID(oldIdentifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd}))

// If the registry key and the key generated by the old identifier
// do not match, log it at debug level and do nothing.
if previousIdentifierKey != registryKey {
p.logger.Debugf("registry key: '%s' and previous file identity key: '%s', differ, will not migrate. Source: '%s'",
registryKey, previousIdentifierKey, fm.Source)
// fmt.Printf("registry key: '%s' and previous file identity key: '%s', differ, will not migrate\n", registryKey, previousIdentifierKey)
return "", fm
}
return "", fm

// The resource matches the file we found in the file system, generate
// a new registry key and return it alongside the updated meta.
newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd}))
fm.IdentifierName = identifierName
p.logger.Debugf("registry key: '%s' and previous file identity key: '%s', are the same migrating. Source: '%s'",
registryKey, previousIdentifierKey, fm.Source)

return newKey, fm
})

return nil
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/filestream/prospector_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ func newProspector(config config) (loginp.Prospector, error) {
return nil, fmt.Errorf("error while creating file identifier: %w", err)
}

logp.L().
With("filestream_id", config.ID).
Debugf("file identity is set to %s", identifier.Name())
logger := logp.L().Named("input.filestream").With("filestream_id", config.ID)
logger.Debugf("file identity is set to %s", identifier.Name())

fileprospector := fileProspector{
filewatcher: filewatcher,
Expand All @@ -64,6 +63,7 @@ func newProspector(config config) (loginp.Prospector, error) {
ignoreInactiveSince: config.IgnoreInactive,
cleanRemoved: config.CleanRemoved,
stateChangeCloser: config.Close.OnStateChange,
logger: logger.Named("prospector"),
}
if config.Rotation == nil {
return &fileprospector, nil
Expand Down
6 changes: 5 additions & 1 deletion filebeat/input/filestream/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,13 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) {
entries map[string]loginp.Value
filesOnDisk map[string]loginp.FileDescriptor
expectedUpdatedKeys map[string]string
newKey string
}{
"prospector init does not update keys if there are no entries": {
entries: nil,
filesOnDisk: nil,
expectedUpdatedKeys: map[string]string{},
newKey: "foo", // it isn't used but it must not be empty
},
"prospector init does not update keys of not existing files": {
entries: map[string]loginp.Value{
Expand All @@ -129,6 +131,7 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) {
},
filesOnDisk: nil,
expectedUpdatedKeys: map[string]string{},
newKey: "foo", // it isn't used but it must not be empty
},
"prospector init updates keys of existing files": {
entries: map[string]loginp.Value{
Expand All @@ -143,6 +146,7 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) {
tmpFileName: {Info: file.ExtendFileInfo(fi)},
},
expectedUpdatedKeys: map[string]string{"not_path::key1": "path::" + tmpFileName},
newKey: "path::" + tmpFileName,
},
}

Expand All @@ -155,7 +159,7 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) {
identifier: mustPathIdentifier(false),
filewatcher: newMockFileWatcherWithFiles(testCase.filesOnDisk),
}
p.Init(testStore, newMockProspectorCleaner(nil), func(loginp.Source) string { return "" })
p.Init(testStore, newMockProspectorCleaner(nil), func(loginp.Source) string { return testCase.newKey })

assert.EqualValues(t, testCase.expectedUpdatedKeys, testStore.updatedKeys)
})
Expand Down
Loading