Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
Expand All @@ -55,6 +56,7 @@ type registryEntry struct {
Cursor struct {
Offset int `json:"offset"`
} `json:"cursor"`
Meta interface{} `json:"meta"`
}

func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
Expand Down Expand Up @@ -185,6 +187,42 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename string, expec
require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
}

// requireMetaInRegistry checks if the expected metadata is saved to the registry.
func (e *inputTestingEnvironment) waitUntilMetaInRegistry(filename string, expectedMeta fileMeta) {
for {
filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
if err != nil {
continue
}

id := getIDFromPath(filepath, fi)
entry, err := e.getRegistryState(id)
if err != nil {
continue
}

if entry.Meta == nil {
continue
}

var meta fileMeta
err = typeconv.Convert(&meta, entry.Meta)
if err != nil {
e.t.Fatalf("cannot convert: %+v", err)
}

if requireMetadataEquals(expectedMeta, meta) {
break
}
time.Sleep(10 * time.Millisecond)
}
}

func requireMetadataEquals(one, other fileMeta) bool {
return one == other
}

func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename string) {
filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
Expand Down
39 changes: 39 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,45 @@ func TestFilestreamCloseRenamed(t *testing.T) {
env.requireOffsetInRegistry(testlogName, len(newerTestlines))
}

func TestFilestreamMetadataUpdatedOnRename(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("renaming files while Filebeat is running is not supported on Windows")
}

env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName) + "*"},
"prospector.scanner.check_interval": "1ms",
})

testline := []byte("log line\n")
env.mustWriteLinesToFile(testlogName, testline)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)
env.waitUntilMetaInRegistry(testlogName, fileMeta{Source: env.abspath(testlogName), IdentifierName: "native"})
env.requireOffsetInRegistry(testlogName, len(testline))

testlogNameRenamed := "test.log.renamed"
env.mustRenameFile(testlogName, testlogNameRenamed)
Comment thread
kvch marked this conversation as resolved.

// check if the metadata is updated and cursor data stays the same
env.waitUntilMetaInRegistry(testlogNameRenamed, fileMeta{Source: env.abspath(testlogNameRenamed), IdentifierName: "native"})
env.requireOffsetInRegistry(testlogNameRenamed, len(testline))

env.mustAppendLinesToFile(testlogNameRenamed, testline)

env.waitUntilEventCount(2)
env.requireOffsetInRegistry(testlogNameRenamed, len(testline)*2)

cancelInput()
env.waitUntilInputStops()
}

// test_close_removed from test_harvester.py
func TestFilestreamCloseRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/internal/input-logfile/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (s *store) findCursorMeta(key string, to interface{}) error {

// updateMetadata updates the cursor metadata in the persistent store.
func (s *store) updateMetadata(key string, meta interface{}) error {
resource := s.ephemeralStore.Find(key, false)
resource := s.ephemeralStore.Find(key, true)
if resource == nil {
return fmt.Errorf("resource '%s' not found", key)
}
Expand Down
11 changes: 10 additions & 1 deletion filebeat/input/filestream/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h
case loginp.OpCreate, loginp.OpWrite:
if fe.Op == loginp.OpCreate {
log.Debugf("A new file %s has been found", fe.NewPath)

err := s.UpdateMetadata(src, fileMeta{Source: fe.NewPath, IdentifierName: p.identifier.Name()})
if err != nil {
log.Errorf("Failed to set cursor meta data of entry %s: %v", src.Name(), err)
}

} else if fe.Op == loginp.OpWrite {
log.Debugf("File %s has been updated", fe.NewPath)
}
Expand Down Expand Up @@ -169,7 +175,10 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h

meta.IdentifierName = p.identifier.Name()
}
s.UpdateMetadata(src, fileMeta{Source: src.newPath, IdentifierName: meta.IdentifierName})
err = s.UpdateMetadata(src, fileMeta{Source: src.newPath, IdentifierName: meta.IdentifierName})
if err != nil {
log.Errorf("Failed to update cursor meta data of entry %s: %v", src.Name(), err)
}

if p.stateChangeCloser.Renamed {
log.Debugf("Stopping harvester as file %s has been renamed and close.on_state_change.renamed is enabled.", src.Name())
Expand Down