Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Fix handling of ADC (Application Default Credentials) metadata server credentials in HTTPJSON input. {issue}44349[44349] {pull}44436[44436]
- Fix handling of ADC (Application Default Credentials) metadata server credentials in CEL input. {issue}44349[44349] {pull}44571[44571]
- Filestream now logs at level warn the number of files that are too small to be ingested {pull}44751[44751]

*Auditbeat*

Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12617,11 +12617,11 @@ SOFTWARE

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-libs
Version: v0.18.9
Version: v0.20.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.18.9/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.20.0/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
34 changes: 27 additions & 7 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,30 @@ type fileWatcher struct {
events chan loginp.FSEvent
}

func newFileWatcher(paths []string, ns *conf.Namespace) (loginp.FSWatcher, error) {
func newFileWatcher(logger *logp.Logger, paths []string, ns *conf.Namespace) (loginp.FSWatcher, error) {
var config *conf.C
if ns == nil {
config = conf.NewConfig()
} else {
config = ns.Config()
}

return newScannerWatcher(paths, config)
return newScannerWatcher(logger, paths, config)
}

func newScannerWatcher(paths []string, c *conf.C) (loginp.FSWatcher, error) {
func newScannerWatcher(logger *logp.Logger, paths []string, c *conf.C) (loginp.FSWatcher, error) {
config := defaultFileWatcherConfig()
err := c.Unpack(&config)
if err != nil {
return nil, err
}
scanner, err := newFileScanner(paths, config.Scanner)
scanner, err := newFileScanner(logger, paths, config.Scanner)
if err != nil {
return nil, err
}

return &fileWatcher{
log: logp.NewLogger(watcherDebugKey),
log: logger.Named(watcherDebugKey),
cfg: config,
prev: make(map[string]loginp.FileDescriptor, 0),
scanner: scanner,
Expand Down Expand Up @@ -295,11 +296,11 @@ type fileScanner struct {
readBuffer []byte
}

func newFileScanner(paths []string, config fileScannerConfig) (*fileScanner, error) {
func newFileScanner(logger *logp.Logger, paths []string, config fileScannerConfig) (*fileScanner, error) {
s := fileScanner{
paths: paths,
cfg: config,
log: logp.NewLogger(scannerDebugKey),
log: logger.Named(scannerDebugKey),
hasher: sha256.New(),
}

Expand Down Expand Up @@ -369,6 +370,8 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor {
uniqueIDs := map[string]string{}
// used to filter out duplicate matches
uniqueFiles := map[string]struct{}{}

tooSmallFiles := 0
for _, path := range s.paths {
matches, err := filepath.Glob(path)
if err != nil {
Expand All @@ -391,6 +394,7 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor {

fd, err := s.toFileDescriptor(&it)
if errors.Is(err, errFileTooSmall) {
tooSmallFiles++
s.log.Debugf("cannot start ingesting from file %q: %s", filename, err)
continue
}
Expand All @@ -409,6 +413,22 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor {
}
}

if tooSmallFiles > 0 {
prefix := "%d files are "
if tooSmallFiles == 1 {
prefix = "%d file is "
}
s.log.Warnf(
prefix+"too small to be ingested, files need to be at "+
"least %d in size for ingestion to start. To change this "+
"behaviour set 'prospector.scanner.fingerprint.length' and "+
"'prospector.scanner.fingerprint.offset'. "+
"Enable debug logging to see all file names.",
tooSmallFiles,
s.cfg.Fingerprint.Offset+s.cfg.Fingerprint.Length,
)
}

return fdByName
}

Expand Down
133 changes: 111 additions & 22 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
"github.com/elastic/beats/v7/libbeat/common/file"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/logp/logptest"
)

func TestFileWatcher(t *testing.T) {
Expand All @@ -54,7 +56,8 @@
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

fw := createWatcherWithConfig(t, paths, cfgStr)
logger := logp.NewNopLogger()
fw := createWatcherWithConfig(t, logger, paths, cfgStr)

go fw.Run(ctx)

Expand Down Expand Up @@ -197,7 +200,9 @@
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

fw := createWatcherWithConfig(t, paths, cfgStr)
logger := logp.NewNopLogger()
fw := createWatcherWithConfig(t, logger, paths, cfgStr)

go fw.Run(ctx)

basename := "created.log"
Expand Down Expand Up @@ -229,7 +234,9 @@
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()

fw := createWatcherWithConfig(t, paths, cfgStr)
logger := logp.NewNopLogger()
fw := createWatcherWithConfig(t, logger, paths, cfgStr)

go fw.Run(ctx)

basename := "created.log"
Expand Down Expand Up @@ -267,9 +274,9 @@
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

logp.DevelopmentSetup(logp.ToObserverOutput())

Check failure on line 277 in filebeat/input/filestream/fswatch_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA1019: logp.DevelopmentSetup is deprecated: Prefer using localized loggers. Use logp.NewDevelopmentLogger. (staticcheck)

fw := createWatcherWithConfig(t, paths, cfgStr)
fw := createWatcherWithConfig(t, logp.L(), paths, cfgStr)
go fw.Run(ctx)

basename := "created.log"
Expand Down Expand Up @@ -322,7 +329,9 @@
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

fw := createWatcherWithConfig(t, paths, cfgStr)
logger := logp.NewNopLogger()
fw := createWatcherWithConfig(t, logger, paths, cfgStr)

go fw.Run(ctx)

basename := "created.log"
Expand Down Expand Up @@ -377,9 +386,9 @@
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

logp.DevelopmentSetup(logp.ToObserverOutput())

Check failure on line 389 in filebeat/input/filestream/fswatch_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA1019: logp.DevelopmentSetup is deprecated: Prefer using localized loggers. Use logp.NewDevelopmentLogger. (staticcheck)

fw := createWatcherWithConfig(t, paths, cfgStr)
fw := createWatcherWithConfig(t, logp.L(), paths, cfgStr)

go fw.Run(ctx)

Expand Down Expand Up @@ -437,9 +446,13 @@
normalSymlinkBasename := "normal_symlink.log"
exclSymlinkBasename := "excl_symlink.log"
travelerSymlinkBasename := "portal.log"
undersizedGlob := "undersized-*.txt"

normalFilename := filepath.Join(dir, normalBasename)
undersizedFilename := filepath.Join(dir, undersizedBasename)
undersized1Filename := filepath.Join(dir, "undersized-1.txt")
undersized2Filename := filepath.Join(dir, "undersized-2.txt")
undersized3Filename := filepath.Join(dir, "undersized-3.txt")
excludedFilename := filepath.Join(dir, excludedBasename)
excludedIncludedFilename := filepath.Join(dir, excludedIncludedBasename)
travelerFilename := filepath.Join(dir2, travelerBasename)
Expand All @@ -450,6 +463,9 @@
files := map[string]string{
normalFilename: strings.Repeat("a", 1024),
undersizedFilename: strings.Repeat("a", 128),
undersized1Filename: strings.Repeat("1", 42),
undersized2Filename: strings.Repeat("2", 42),
undersized3Filename: strings.Repeat("3", 42),
excludedFilename: strings.Repeat("nothing to see here", 1024),
excludedIncludedFilename: strings.Repeat("perhaps something to see here", 1024),
travelerFilename: strings.Repeat("folks, I think I got lost", 1024),
Expand Down Expand Up @@ -796,28 +812,59 @@

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
s := createScannerWithConfig(t, paths, tc.cfgStr)
logger := logp.NewNopLogger()
s := createScannerWithConfig(t, logger, paths, tc.cfgStr)
requireEqualFiles(t, tc.expDesc, s.GetFiles())
})
}

t.Run("does not issue warnings when file is too small", func(t *testing.T) {
t.Run("issue a single warning with the number of files that are too small and debug with filenames", func(t *testing.T) {
cfgStr := `
scanner:
fingerprint:
enabled: true
offset: 0
length: 1024
`
logp.DevelopmentSetup(logp.ToObserverOutput())

// this file is 128 bytes long
paths := []string{filepath.Join(dir, undersizedBasename)}
s := createScannerWithConfig(t, paths, cfgStr)
// the glob for the very small files
paths := []string{filepath.Join(dir, undersizedGlob)}
logger, buffer := logp.NewInMemoryLocal("test-logger", zapcore.EncoderConfig{})

s := createScannerWithConfig(t, logger, paths, cfgStr)
files := s.GetFiles()
require.Empty(t, files)
logs := logp.ObserverLogs().FilterLevelExact(logp.WarnLevel.ZapLevel()).TakeAll()
require.Empty(t, logs, "there must be no warning logs for files too small")

logs := parseLogs(buffer.String())
require.NotEmpty(t, logs, "fileScanner.GetFiles must log some warnings")

// The last log entry from s.GetFiles must be at warn level and
// in the format 'x files are too small"
lastEntry := logs[len(logs)-1]
require.Equal(t, "warn", lastEntry.level, "'x files are too small' must be at level warn")
require.Contains(t, lastEntry.message, "3 files are too small to be ingested")

// For each file that is too small to be ingested, s.GetFiles must log
// at debug level the filename and its size
expectedMsgs := []string{
fmt.Sprintf("cannot start ingesting from file %[1]q: filesize of %[1]q is 42 bytes", undersized1Filename),
fmt.Sprintf("cannot start ingesting from file %[1]q: filesize of %[1]q is 42 bytes", undersized2Filename),
fmt.Sprintf("cannot start ingesting from file %[1]q: filesize of %[1]q is 42 bytes", undersized3Filename),
}

for _, msg := range expectedMsgs {
found := false
for _, log := range logs {
if strings.HasPrefix(log.message, msg) {
found = true
break
}
}

if !found {
t.Errorf("did not find %q in the logs", msg)
}
}
})

t.Run("returns error when creating scanner with a fingerprint too small", func(t *testing.T) {
Expand All @@ -835,12 +882,47 @@
err = ns.Unpack(cfg)
require.NoError(t, err)

_, err = newFileWatcher(paths, ns)
logger := logptest.NewTestingLogger(t, "log-selector")
_, err = newFileWatcher(logger, paths, ns)
require.Error(t, err)
require.Contains(t, err.Error(), "fingerprint size 1 bytes cannot be smaller than 64 bytes")
})
}

type logEntry struct {
timestamp string
level string
message string
}

// parseLogs parsers the logs in buff and returns them as a slice of logEntry.
// It is meant to be used with `logp.NewInMemoryLocal` where buff is the
// contents of the buffer returned by `logp.NewInMemoryLocal`.
// Log entries are expected to be separated by a new line and each log entry
// is expected to have 3 fields separated by a tab "\t": timestamp, level
// and message.
func parseLogs(buff string) []logEntry {
logEntries := []logEntry{}

for l := range strings.SplitSeq(buff, "\n") {
if l == "" {
continue
}

split := strings.Split(l, "\t")
if len(split) != 3 {
continue
}
logEntries = append(logEntries, logEntry{
timestamp: split[0],
level: split[1],
message: split[2],
})
}

return logEntries
}

const benchmarkFileCount = 1000

func BenchmarkGetFiles(b *testing.B) {
Expand All @@ -859,7 +941,9 @@
Enabled: false,
},
}
s, err := newFileScanner(paths, cfg)

logger := logp.NewNopLogger()
s, err := newFileScanner(logger, paths, cfg)
require.NoError(b, err)

for i := 0; i < b.N; i++ {
Expand All @@ -886,7 +970,9 @@
Length: 1024,
},
}
s, err := newFileScanner(paths, cfg)

logger := logp.NewNopLogger()
s, err := newFileScanner(logger, paths, cfg)
require.NoError(b, err)

for i := 0; i < b.N; i++ {
Expand All @@ -895,21 +981,21 @@
}
}

func createWatcherWithConfig(t *testing.T, paths []string, cfgStr string) loginp.FSWatcher {
func createWatcherWithConfig(t *testing.T, logger *logp.Logger, paths []string, cfgStr string) loginp.FSWatcher {
cfg, err := conf.NewConfigWithYAML([]byte(cfgStr), cfgStr)
require.NoError(t, err)

ns := &conf.Namespace{}
err = ns.Unpack(cfg)
require.NoError(t, err)

fw, err := newFileWatcher(paths, ns)
fw, err := newFileWatcher(logger, paths, ns)
require.NoError(t, err)

return fw
}

func createScannerWithConfig(t *testing.T, paths []string, cfgStr string) loginp.FSScanner {
func createScannerWithConfig(t *testing.T, logger *logp.Logger, paths []string, cfgStr string) loginp.FSScanner {
cfg, err := conf.NewConfigWithYAML([]byte(cfgStr), cfgStr)
require.NoError(t, err)

Expand All @@ -920,7 +1006,8 @@
config := defaultFileWatcherConfig()
err = ns.Config().Unpack(&config)
require.NoError(t, err)
scanner, err := newFileScanner(paths, config.Scanner)

scanner, err := newFileScanner(logger, paths, config.Scanner)
require.NoError(t, err)

return scanner
Expand Down Expand Up @@ -975,7 +1062,9 @@
Length: 1024,
},
}
s, err := newFileScanner(paths, cfg)

logger := logp.NewNopLogger()
s, err := newFileScanner(logger, paths, cfg)
require.NoError(b, err)

it, err := s.getIngestTarget(filename)
Expand Down
Loading
Loading