diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ba82e39e1133..2e6b786384b5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -226,6 +226,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Refactor & cleanup with updates to default values and documentation. {pull}41834[41834] - Fix handling of ADC (Application Default Credentials) metadata server credentials in HTTPJSON input. {issue}44349[44349] {pull}44436[44436] - Fix handling of ADC (Application Default Credentials) metadata server credentials in CEL input. {issue}44349[44349] {pull}44571[44571] +- Filestream now logs at level warn the number of files that are too small to be ingested {pull}44751[44751] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index 10cbab9dc8aa..6054dea93f44 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12617,11 +12617,11 @@ SOFTWARE -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-libs -Version: v0.18.9 +Version: v0.20.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.18.9/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.20.0/LICENSE: Apache License Version 2.0, January 2004 diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index c51d850bbd2c..341d280e2404 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -70,7 +70,7 @@ type fileWatcher struct { events chan loginp.FSEvent } -func newFileWatcher(paths []string, ns *conf.Namespace) (loginp.FSWatcher, error) { +func newFileWatcher(logger *logp.Logger, paths []string, ns *conf.Namespace) (loginp.FSWatcher, error) { var config *conf.C if ns == nil { config = conf.NewConfig() @@ -78,21 +78,22 @@ func newFileWatcher(paths []string, ns *conf.Namespace) (loginp.FSWatcher, error config = ns.Config() } - return newScannerWatcher(paths, config) + return newScannerWatcher(logger, paths, config) } -func newScannerWatcher(paths []string, c *conf.C) (loginp.FSWatcher, error) { +func newScannerWatcher(logger *logp.Logger, paths []string, c *conf.C) (loginp.FSWatcher, error) { config := defaultFileWatcherConfig() err := c.Unpack(&config) if err != nil { return nil, err } - scanner, err := newFileScanner(paths, config.Scanner) + scanner, err := newFileScanner(logger, paths, config.Scanner) if err != nil { return nil, err } + return &fileWatcher{ - log: logp.NewLogger(watcherDebugKey), + log: logger.Named(watcherDebugKey), cfg: config, prev: make(map[string]loginp.FileDescriptor, 0), scanner: scanner, @@ -295,11 +296,11 @@ type fileScanner struct { readBuffer []byte } -func newFileScanner(paths []string, config fileScannerConfig) (*fileScanner, error) { +func newFileScanner(logger *logp.Logger, paths []string, config fileScannerConfig) (*fileScanner, error) { s := fileScanner{ paths: paths, cfg: config, - log: logp.NewLogger(scannerDebugKey), + log: logger.Named(scannerDebugKey), hasher: sha256.New(), } @@ -369,6 +370,8 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor { uniqueIDs := map[string]string{} // used to filter out duplicate matches uniqueFiles := map[string]struct{}{} + + tooSmallFiles := 0 for _, path := range s.paths { matches, err := filepath.Glob(path) if err != nil { @@ -391,6 +394,7 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor { fd, err := s.toFileDescriptor(&it) if errors.Is(err, errFileTooSmall) { + tooSmallFiles++ s.log.Debugf("cannot start ingesting from file %q: %s", filename, err) continue } @@ -409,6 +413,22 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor { } } + if tooSmallFiles > 0 { + prefix := "%d files are " + if tooSmallFiles == 1 { + prefix = "%d file is " + } + s.log.Warnf( + prefix+"too small to be ingested, files need to be at "+ + "least %d in size for ingestion to start. To change this "+ + "behaviour set 'prospector.scanner.fingerprint.length' and "+ + "'prospector.scanner.fingerprint.offset'. "+ + "Enable debug logging to see all file names.", + tooSmallFiles, + s.cfg.Fingerprint.Offset+s.cfg.Fingerprint.Length, + ) + } + return fdByName } diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index 528caec79de3..e00a5ed5f5b6 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -28,11 +28,13 @@ import ( "time" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" "github.com/elastic/beats/v7/libbeat/common/file" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" ) func TestFileWatcher(t *testing.T) { @@ -54,7 +56,8 @@ scanner: ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - fw := createWatcherWithConfig(t, paths, cfgStr) + logger := logp.NewNopLogger() + fw := createWatcherWithConfig(t, logger, paths, cfgStr) go fw.Run(ctx) @@ -197,7 +200,9 @@ scanner: ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - fw := createWatcherWithConfig(t, paths, cfgStr) + logger := logp.NewNopLogger() + fw := createWatcherWithConfig(t, logger, paths, cfgStr) + go fw.Run(ctx) basename := "created.log" @@ -229,7 +234,9 @@ scanner: ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) defer cancel() - fw := createWatcherWithConfig(t, paths, cfgStr) + logger := logp.NewNopLogger() + fw := createWatcherWithConfig(t, logger, paths, cfgStr) + go fw.Run(ctx) basename := "created.log" @@ -269,7 +276,7 @@ scanner: logp.DevelopmentSetup(logp.ToObserverOutput()) - fw := createWatcherWithConfig(t, paths, cfgStr) + fw := createWatcherWithConfig(t, logp.L(), paths, cfgStr) go fw.Run(ctx) basename := "created.log" @@ -322,7 +329,9 @@ scanner: ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - fw := createWatcherWithConfig(t, paths, cfgStr) + logger := logp.NewNopLogger() + fw := createWatcherWithConfig(t, logger, paths, cfgStr) + go fw.Run(ctx) basename := "created.log" @@ -379,7 +388,7 @@ scanner: logp.DevelopmentSetup(logp.ToObserverOutput()) - fw := createWatcherWithConfig(t, paths, cfgStr) + fw := createWatcherWithConfig(t, logp.L(), paths, cfgStr) go fw.Run(ctx) @@ -437,9 +446,13 @@ func TestFileScanner(t *testing.T) { normalSymlinkBasename := "normal_symlink.log" exclSymlinkBasename := "excl_symlink.log" travelerSymlinkBasename := "portal.log" + undersizedGlob := "undersized-*.txt" normalFilename := filepath.Join(dir, normalBasename) undersizedFilename := filepath.Join(dir, undersizedBasename) + undersized1Filename := filepath.Join(dir, "undersized-1.txt") + undersized2Filename := filepath.Join(dir, "undersized-2.txt") + undersized3Filename := filepath.Join(dir, "undersized-3.txt") excludedFilename := filepath.Join(dir, excludedBasename) excludedIncludedFilename := filepath.Join(dir, excludedIncludedBasename) travelerFilename := filepath.Join(dir2, travelerBasename) @@ -450,6 +463,9 @@ func TestFileScanner(t *testing.T) { files := map[string]string{ normalFilename: strings.Repeat("a", 1024), undersizedFilename: strings.Repeat("a", 128), + undersized1Filename: strings.Repeat("1", 42), + undersized2Filename: strings.Repeat("2", 42), + undersized3Filename: strings.Repeat("3", 42), excludedFilename: strings.Repeat("nothing to see here", 1024), excludedIncludedFilename: strings.Repeat("perhaps something to see here", 1024), travelerFilename: strings.Repeat("folks, I think I got lost", 1024), @@ -796,12 +812,13 @@ scanner: for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - s := createScannerWithConfig(t, paths, tc.cfgStr) + logger := logp.NewNopLogger() + s := createScannerWithConfig(t, logger, paths, tc.cfgStr) requireEqualFiles(t, tc.expDesc, s.GetFiles()) }) } - t.Run("does not issue warnings when file is too small", func(t *testing.T) { + t.Run("issue a single warning with the number of files that are too small and debug with filenames", func(t *testing.T) { cfgStr := ` scanner: fingerprint: @@ -809,15 +826,45 @@ scanner: offset: 0 length: 1024 ` - logp.DevelopmentSetup(logp.ToObserverOutput()) - // this file is 128 bytes long - paths := []string{filepath.Join(dir, undersizedBasename)} - s := createScannerWithConfig(t, paths, cfgStr) + // the glob for the very small files + paths := []string{filepath.Join(dir, undersizedGlob)} + logger, buffer := logp.NewInMemoryLocal("test-logger", zapcore.EncoderConfig{}) + + s := createScannerWithConfig(t, logger, paths, cfgStr) files := s.GetFiles() require.Empty(t, files) - logs := logp.ObserverLogs().FilterLevelExact(logp.WarnLevel.ZapLevel()).TakeAll() - require.Empty(t, logs, "there must be no warning logs for files too small") + + logs := parseLogs(buffer.String()) + require.NotEmpty(t, logs, "fileScanner.GetFiles must log some warnings") + + // The last log entry from s.GetFiles must be at warn level and + // in the format 'x files are too small" + lastEntry := logs[len(logs)-1] + require.Equal(t, "warn", lastEntry.level, "'x files are too small' must be at level warn") + require.Contains(t, lastEntry.message, "3 files are too small to be ingested") + + // For each file that is too small to be ingested, s.GetFiles must log + // at debug level the filename and its size + expectedMsgs := []string{ + fmt.Sprintf("cannot start ingesting from file %[1]q: filesize of %[1]q is 42 bytes", undersized1Filename), + fmt.Sprintf("cannot start ingesting from file %[1]q: filesize of %[1]q is 42 bytes", undersized2Filename), + fmt.Sprintf("cannot start ingesting from file %[1]q: filesize of %[1]q is 42 bytes", undersized3Filename), + } + + for _, msg := range expectedMsgs { + found := false + for _, log := range logs { + if strings.HasPrefix(log.message, msg) { + found = true + break + } + } + + if !found { + t.Errorf("did not find %q in the logs", msg) + } + } }) t.Run("returns error when creating scanner with a fingerprint too small", func(t *testing.T) { @@ -835,12 +882,47 @@ scanner: err = ns.Unpack(cfg) require.NoError(t, err) - _, err = newFileWatcher(paths, ns) + logger := logptest.NewTestingLogger(t, "log-selector") + _, err = newFileWatcher(logger, paths, ns) require.Error(t, err) require.Contains(t, err.Error(), "fingerprint size 1 bytes cannot be smaller than 64 bytes") }) } +type logEntry struct { + timestamp string + level string + message string +} + +// parseLogs parsers the logs in buff and returns them as a slice of logEntry. +// It is meant to be used with `logp.NewInMemoryLocal` where buff is the +// contents of the buffer returned by `logp.NewInMemoryLocal`. +// Log entries are expected to be separated by a new line and each log entry +// is expected to have 3 fields separated by a tab "\t": timestamp, level +// and message. +func parseLogs(buff string) []logEntry { + logEntries := []logEntry{} + + for l := range strings.SplitSeq(buff, "\n") { + if l == "" { + continue + } + + split := strings.Split(l, "\t") + if len(split) != 3 { + continue + } + logEntries = append(logEntries, logEntry{ + timestamp: split[0], + level: split[1], + message: split[2], + }) + } + + return logEntries +} + const benchmarkFileCount = 1000 func BenchmarkGetFiles(b *testing.B) { @@ -859,7 +941,9 @@ func BenchmarkGetFiles(b *testing.B) { Enabled: false, }, } - s, err := newFileScanner(paths, cfg) + + logger := logp.NewNopLogger() + s, err := newFileScanner(logger, paths, cfg) require.NoError(b, err) for i := 0; i < b.N; i++ { @@ -886,7 +970,9 @@ func BenchmarkGetFilesWithFingerprint(b *testing.B) { Length: 1024, }, } - s, err := newFileScanner(paths, cfg) + + logger := logp.NewNopLogger() + s, err := newFileScanner(logger, paths, cfg) require.NoError(b, err) for i := 0; i < b.N; i++ { @@ -895,7 +981,7 @@ func BenchmarkGetFilesWithFingerprint(b *testing.B) { } } -func createWatcherWithConfig(t *testing.T, paths []string, cfgStr string) loginp.FSWatcher { +func createWatcherWithConfig(t *testing.T, logger *logp.Logger, paths []string, cfgStr string) loginp.FSWatcher { cfg, err := conf.NewConfigWithYAML([]byte(cfgStr), cfgStr) require.NoError(t, err) @@ -903,13 +989,13 @@ func createWatcherWithConfig(t *testing.T, paths []string, cfgStr string) loginp err = ns.Unpack(cfg) require.NoError(t, err) - fw, err := newFileWatcher(paths, ns) + fw, err := newFileWatcher(logger, paths, ns) require.NoError(t, err) return fw } -func createScannerWithConfig(t *testing.T, paths []string, cfgStr string) loginp.FSScanner { +func createScannerWithConfig(t *testing.T, logger *logp.Logger, paths []string, cfgStr string) loginp.FSScanner { cfg, err := conf.NewConfigWithYAML([]byte(cfgStr), cfgStr) require.NoError(t, err) @@ -920,7 +1006,8 @@ func createScannerWithConfig(t *testing.T, paths []string, cfgStr string) loginp config := defaultFileWatcherConfig() err = ns.Config().Unpack(&config) require.NoError(t, err) - scanner, err := newFileScanner(paths, config.Scanner) + + scanner, err := newFileScanner(logger, paths, config.Scanner) require.NoError(t, err) return scanner @@ -975,7 +1062,9 @@ func BenchmarkToFileDescriptor(b *testing.B) { Length: 1024, }, } - s, err := newFileScanner(paths, cfg) + + logger := logp.NewNopLogger() + s, err := newFileScanner(logger, paths, cfg) require.NoError(b, err) it, err := s.getIngestTarget(filename) diff --git a/filebeat/input/filestream/prospector_creator.go b/filebeat/input/filestream/prospector_creator.go index 5142704a614d..3978885d3b92 100644 --- a/filebeat/input/filestream/prospector_creator.go +++ b/filebeat/input/filestream/prospector_creator.go @@ -38,12 +38,13 @@ const ( var experimentalWarning sync.Once func newProspector(config config) (loginp.Prospector, error) { + logger := logp.L().With("filestream_id", config.ID) err := checkConfigCompatibility(config.FileWatcher, config.FileIdentity) if err != nil { return nil, err } - filewatcher, err := newFileWatcher(config.Paths, config.FileWatcher) + filewatcher, err := newFileWatcher(logger, config.Paths, config.FileWatcher) if err != nil { return nil, fmt.Errorf("error while creating filewatcher %w", err) } @@ -53,9 +54,8 @@ func newProspector(config config) (loginp.Prospector, error) { return nil, fmt.Errorf("error while creating file identifier: %w", err) } - logp.L(). - With("filestream_id", config.ID). - Debugf("file identity is set to %s", identifier.Name()) + logger = logger.Named("input.filestream") + logger.Debugf("file identity is set to %s", identifier.Name()) fileprospector := fileProspector{ filewatcher: filewatcher, diff --git a/go.mod b/go.mod index eb37cf07b5bb..9496d2bae7e7 100644 --- a/go.mod +++ b/go.mod @@ -187,7 +187,7 @@ require ( github.com/elastic/bayeux v1.0.5 github.com/elastic/ebpfevents v0.6.0 github.com/elastic/elastic-agent-autodiscover v0.9.0 - github.com/elastic/elastic-agent-libs v0.18.9 + github.com/elastic/elastic-agent-libs v0.20.0 github.com/elastic/elastic-agent-system-metrics v0.11.11 github.com/elastic/go-elasticsearch/v8 v8.17.0 github.com/elastic/go-quark v0.2.0 diff --git a/go.sum b/go.sum index ab0f8a7a94f2..a7c88d1d220a 100644 --- a/go.sum +++ b/go.sum @@ -337,8 +337,8 @@ github.com/elastic/elastic-agent-autodiscover v0.9.0 h1:+iWIKh0u3e8I+CJa3FfWe9h0 github.com/elastic/elastic-agent-autodiscover v0.9.0/go.mod h1:5iUxLHhVdaGSWYTveSwfJEY4RqPXTG13LPiFoxcpFd4= github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE= github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI= -github.com/elastic/elastic-agent-libs v0.18.9 h1:NQbBK6uMd/t4S0Fe7iOpT5mE7RXMWVaOLBSEXHRBzWI= -github.com/elastic/elastic-agent-libs v0.18.9/go.mod h1:Repx7BMzE1v/gTipPogNIQeEnSGwOWGBC63h7h9c5aM= +github.com/elastic/elastic-agent-libs v0.20.0 h1:MPjenwuEr+QfMeQRV4BK817ZbiNS38SXJE7QGHDwhUs= +github.com/elastic/elastic-agent-libs v0.20.0/go.mod h1:1HNxREH8C27kGrJCtKZh/ot8pV8joH8VREP21+FrH5s= github.com/elastic/elastic-agent-system-metrics v0.11.11 h1:Qjh3Zef23PfGlG91AF+9ciNLNQf/8cDJ4CalnLZtV3g= github.com/elastic/elastic-agent-system-metrics v0.11.11/go.mod h1:GNqmKfvOt8PwORjbS6GllNdMfkLpOWyTa7P8oQq4E5o= github.com/elastic/elastic-transport-go/v8 v8.6.1 h1:h2jQRqH6eLGiBSN4eZbQnJLtL4bC5b4lfVFRjw2R4e4=